-
Notifications
You must be signed in to change notification settings - Fork 28
Expose streaming API #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
373666a
6e129ec
c0989bf
b37ad44
4b1ee6b
b7098b8
791a7ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ Bindings to Erlang's built in HTTP client, `httpc`. | |
| ```sh | ||
| gleam add gleam_httpc@5 | ||
| ``` | ||
|
|
||
| ```gleam | ||
| import gleam/http/request | ||
| import gleam/http/response | ||
|
|
@@ -37,6 +38,49 @@ pub fn send_request() { | |
| } | ||
| ``` | ||
|
|
||
| ## Http streaming requests | ||
|
|
||
| `httpc` supports `stream:{self, once}` mode, which is a **pull-based** approach for | ||
| accepting streamed responses. In this mode, after receiving the `handler_pid`, from the | ||
| `StreamStart` message, the caller must explicitly request the next stream message | ||
| using `receive_next_stream_message/1`. | ||
|
|
||
| ```gleam | ||
| import gleam/http.{Get} | ||
| import gleam/http/request | ||
| import gleam/httpc | ||
| import gleam/process | ||
|
|
||
| /// Receive a streamed response from Postman Echo. The number of | ||
| /// stream chunks we receive is 1, as we specfied in the endpoint | ||
| /// | ||
|
||
| pub fn stream_self_once() { | ||
| let req = | ||
| request.new() | ||
| |> request.set_method(Get) | ||
| |> request.set_host("postman-echo.com") | ||
| |> request.set_path("/stream/1") | ||
|
|
||
| // Send the streaming request to the server | ||
| let assert Ok(request_id) = httpc.send_stream_request(req) | ||
|
|
||
| // Configure the selector | ||
| let selector = process.new_selector() |> httpc.select_stream_messages(httpc.raw_stream_mapper()) | ||
|
|
||
| let assert Ok(httpc.StreamStart(_request_id_, _headers, handler_pid)) = | ||
| process.selector_receive(selector, 1000) | ||
|
|
||
| let _nil = httpc.receive_next_stream_message(handler_pid) | ||
| let assert Ok(httpc.StreamChunk(_request_id_, _binary_part)) = | ||
| process.selector_receive(selector, 1000) | ||
|
|
||
| let _nil = httpc.receive_next_stream_message(handler_pid) | ||
| let assert Ok(httpc.StreamEnd(_request_id_, _headers)) = | ||
| process.selector_receive(selector, 1000) | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Use with Erlang/OTP versions older than 26.0 | ||
|
|
||
| Older versions of HTTPC do not verify TLS connections by default, so with them | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,9 @@ | ||
| import gleam/bit_array | ||
|
|
||
| import gleam/dynamic.{type Dynamic} | ||
| import gleam/erlang/atom | ||
| import gleam/erlang/charlist.{type Charlist} | ||
| import gleam/erlang/process | ||
| import gleam/http.{type Method} | ||
| import gleam/http/request.{type Request} | ||
| import gleam/http/response.{type Response, Response} | ||
|
|
@@ -38,9 +41,19 @@ type BodyFormat { | |
| Binary | ||
| } | ||
|
|
||
| type Destination { | ||
| Self | ||
| } | ||
|
|
||
| type Mode { | ||
| Once | ||
| } | ||
|
|
||
| type ErlOption { | ||
| BodyFormat(BodyFormat) | ||
| SocketOpts(List(SocketOpt)) | ||
| Sync(Bool) | ||
| Stream(#(Destination, Mode)) | ||
| } | ||
|
|
||
| type SocketOpt { | ||
|
|
@@ -59,6 +72,24 @@ type ErlVerifyOption { | |
| VerifyNone | ||
| } | ||
|
|
||
| pub type HttpSocket | ||
|
||
|
|
||
| pub type RequestIdentifier | ||
lpil marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| pub type RawStreamMessage { | ||
| RawStreamStart(RequestIdentifier, List(#(Charlist, Charlist)), process.Pid) | ||
| RawStreamChunk(RequestIdentifier, BitArray) | ||
| RawStreamEnd(RequestIdentifier, List(#(Charlist, Charlist))) | ||
| RawStreamError(RequestIdentifier, HttpError) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it is an implementation detail and should not be in part of the public interface?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's an implementation detail. I've refactored the code to make it private, and, as a consequence, I was able to simplify the configuration of the selector that receives stream messages (See
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How come we construct values of this type and then immediately convert to another type? Why not construct that final type and skip this intermediate one? |
||
|
|
||
| pub type StreamMessage { | ||
| StreamStart(RequestIdentifier, List(#(String, String)), process.Pid) | ||
| StreamChunk(RequestIdentifier, BitArray) | ||
| StreamEnd(RequestIdentifier, List(#(String, String))) | ||
| StreamError(RequestIdentifier, HttpError) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you document this type and add labels please 🙏
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
|
||
| @external(erlang, "httpc", "request") | ||
| fn erl_request( | ||
| a: Method, | ||
|
|
@@ -81,6 +112,22 @@ fn erl_request_no_body( | |
| Dynamic, | ||
| ) | ||
|
|
||
| @external(erlang, "httpc", "request") | ||
| fn erl_stream_request_no_body( | ||
| a: Method, | ||
| b: #(Charlist, List(#(Charlist, Charlist))), | ||
| c: List(ErlHttpOption), | ||
| d: List(ErlOption), | ||
| ) -> Result(RequestIdentifier, Dynamic) | ||
|
|
||
| @external(erlang, "httpc", "request") | ||
| fn erl_stream_request( | ||
| a: Method, | ||
| b: #(Charlist, List(#(Charlist, Charlist)), Charlist, BitArray), | ||
| c: List(ErlHttpOption), | ||
| d: List(ErlOption), | ||
| ) -> Result(RequestIdentifier, Dynamic) | ||
|
|
||
| fn string_header(header: #(Charlist, Charlist)) -> #(String, String) { | ||
| let #(k, v) = header | ||
| #(charlist.to_string(k), charlist.to_string(v)) | ||
|
|
@@ -98,6 +145,124 @@ pub fn send_bits( | |
| |> dispatch_bits(req) | ||
| } | ||
|
|
||
| /// Send a HTTP stream request of binary data | ||
| /// | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you finish this documentation please 🙏
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| pub fn dispatch_stream_bits( | ||
| config: Configuration, | ||
| req: Request(BitArray), | ||
| ) -> Result(RequestIdentifier, HttpError) { | ||
| let erl_url = | ||
| req | ||
| |> request.to_uri | ||
| |> uri.to_string | ||
| |> charlist.from_string | ||
| let erl_headers = prepare_headers(req.headers) | ||
| let erl_http_options = [ | ||
| Autoredirect(config.follow_redirects), | ||
| Timeout(config.timeout), | ||
| ] | ||
| let erl_http_options = case config.verify_tls { | ||
| True -> erl_http_options | ||
| False -> [Ssl([Verify(VerifyNone)]), ..erl_http_options] | ||
| } | ||
| let erl_options = [ | ||
| BodyFormat(Binary), | ||
| SocketOpts([Ipfamily(Inet6fb4)]), | ||
| Sync(False), | ||
| Stream(#(Self, Once)), | ||
| ] | ||
| use request_id <- result.try( | ||
| case req.method { | ||
| http.Options | http.Head | http.Get -> { | ||
| let erl_req = #(erl_url, erl_headers) | ||
| erl_stream_request_no_body( | ||
| req.method, | ||
| erl_req, | ||
| erl_http_options, | ||
| erl_options, | ||
| ) | ||
| } | ||
| _ -> { | ||
| let erl_content_type = | ||
| req | ||
| |> request.get_header("content-type") | ||
| |> result.unwrap("application/octet-stream") | ||
| |> charlist.from_string | ||
| let erl_req = #(erl_url, erl_headers, erl_content_type, req.body) | ||
| erl_stream_request(req.method, erl_req, erl_http_options, erl_options) | ||
| } | ||
| } | ||
| |> result.map_error(normalise_error), | ||
| ) | ||
|
|
||
| Ok(request_id) | ||
| } | ||
|
|
||
| /// Triggers the next asynchronous streaming message to be sent to the calling process | ||
|
||
| /// designated by `pid`. | ||
|
||
| /// | ||
| @external(erlang, "gleam_httpc_ffi", "receive_next_stream_message") | ||
| pub fn receive_next_stream_message(id: process.Pid) -> Nil | ||
|
|
||
| @external(erlang, "gleam_httpc_ffi", "coerce_stream_message") | ||
| fn decode_stream_message(msg: Dynamic) -> RawStreamMessage | ||
|
|
||
| /// Configure a selector to receive stream messages | ||
| /// | ||
| /// Note this will receive messages from all processes that sent a HTTP stream request; | ||
| /// for example using `send_stream_request`, rather than any specific one. | ||
| /// In this case, for finer grained processing, you can filter on the `RequestIdentifier`, | ||
| /// which is the first argument in the `StreamMessage` constructor. | ||
| /// If you wish to only handle stream messages from one process, then use one | ||
| /// process per HTTP stream request. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```gleam | ||
| /// process.new_selector() |> select_stream_messages(raw_stream_mapper()) | ||
| /// ``` | ||
| /// | ||
| pub fn select_stream_messages( | ||
| selector: process.Selector(t), | ||
| mapper: fn(RawStreamMessage) -> t, | ||
| ) -> process.Selector(t) { | ||
| let http = atom.create(http.scheme_to_string(http.Http)) | ||
| let map_stream_message = fn(mapper) { | ||
| fn(message) { mapper(decode_stream_message(message)) } | ||
| } | ||
|
|
||
| selector | ||
| |> process.select_record(http, 1, map_stream_message(mapper)) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have this take the selector as an argument, to match all other selecting code. Forcing a new selector means the programmer cannot use this library any time that they need to use an existing selector.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, your mug example (i.e. |
||
|
|
||
| /// Converts a raw stream message into a user-facing `StreamMessage`. | ||
| /// | ||
| /// This mapper is primarily used to transform header values from | ||
| /// `List(#(Charlist, Charlist))` into the more idiomatic `List(#(String, String))`, | ||
| /// which is easier to work with in Gleam. | ||
| /// | ||
| /// You can use this function as the `mapper` argument to `select_stream_messages/2`, | ||
| /// or you can supply your own custom mapper if you need additional transformations. | ||
| /// | ||
| /// ## Example | ||
| /// | ||
| /// ```gleam | ||
| /// process.new_selector() |> select_stream_messages(raw_stream_mapper()) | ||
| /// ``` | ||
| /// | ||
| pub fn raw_stream_mapper() -> fn(RawStreamMessage) -> StreamMessage { | ||
|
||
| fn(msg: RawStreamMessage) { | ||
| case msg { | ||
| RawStreamChunk(request_id, bin_part) -> StreamChunk(request_id, bin_part) | ||
| RawStreamStart(request_id, headers, pid) -> | ||
| StreamStart(request_id, list.map(headers, string_header), pid) | ||
| RawStreamEnd(request_id, headers) -> | ||
| StreamEnd(request_id, list.map(headers, string_header)) | ||
| RawStreamError(request_id, reason) -> StreamError(request_id, reason) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // TODO: refine error type | ||
| /// Send a HTTP request of binary data. | ||
| /// | ||
|
|
@@ -141,6 +306,7 @@ pub fn dispatch_bits( | |
| ) | ||
|
|
||
| let #(#(_version, status, _status), headers, resp_body) = response | ||
|
|
||
| Ok(Response(status, list.map(headers, string_header), resp_body)) | ||
| } | ||
|
|
||
|
|
@@ -177,7 +343,6 @@ pub opaque type Configuration { | |
| /// - Redirects are not followed. | ||
| /// - The timeout for the response to be received is 30 seconds from when the | ||
| /// request is sent. | ||
| /// | ||
| pub fn configure() -> Configuration { | ||
| Builder(verify_tls: True, follow_redirects: False, timeout: 30_000) | ||
| } | ||
|
|
@@ -209,7 +374,7 @@ pub fn timeout(config: Configuration, timeout: Int) -> Configuration { | |
| Builder(..config, timeout:) | ||
| } | ||
|
|
||
| /// Send a HTTP request of unicode data. | ||
| /// Send a synchronus HTTP request of unicode data. | ||
|
||
| /// | ||
| pub fn dispatch( | ||
| config: Configuration, | ||
|
|
@@ -224,7 +389,58 @@ pub fn dispatch( | |
| } | ||
| } | ||
|
|
||
| // TODO: refine error type | ||
| /// Send a HTTP stream request of unicode data using a custom `Configuration`. | ||
| /// | ||
| /// This function supports only the `stream: {self, once}` mode from `httpc`, which is a | ||
| /// **pull-based** streaming approach. In this mode, the caller must explicitly request | ||
| /// the next stream message using `receive_next_stream_message/1`. | ||
| /// | ||
| /// If the request is successfully dispatched, this function returns a `RequestIdentifier`. | ||
| /// This identifier is useful when managing multiple concurrent streaming requests, | ||
| /// allowing you to match incoming messages to the originating request. | ||
| /// | ||
| /// Once you've configured a selector to receive stream messages (see `select_stream_messages/1`), | ||
| /// the other `StreamMessage` variants will be delivered to the user | ||
| /// | ||
| /// With the exception of timeout errors, all other errors will be delivered via: | ||
| /// `StreamError(RequestIdentifier, HttpError)`. | ||
| /// | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a complex API, so folks will need examples to understand how to use it. Could you add these please 🙏
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a thorough example to the README
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also added a simpler example to the documentation above |
||
| pub fn dispatch_stream_request( | ||
| config: Configuration, | ||
| request: Request(String), | ||
| ) -> Result(RequestIdentifier, HttpError) { | ||
| let request = request.map(request, bit_array.from_string) | ||
| use request_id <- result.try(dispatch_stream_bits(config, request)) | ||
| Ok(request_id) | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is named as if it works with strings, but the body type returned is a bit array. Maybe we don't need this version and just working always with bit arrays is fine.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's always work with bits and not have a string specialised version, seeing as it is not possible to ensure that the response is a string, as the other string versions do. |
||
|
|
||
| /// Sends an HTTP streaming request with a Unicode body using the default `Configuration`. | ||
| /// | ||
| /// This function supports only the `stream: {self, once}` mode from `httpc`, which is a | ||
| /// **pull-based** streaming approach. In this mode, after receiving the `handler_pid`, from the | ||
| /// `StreamStart` message, the caller must explicitly request the next stream message | ||
| /// using `receive_next_stream_message/1`.the caller must explicitly request | ||
| /// | ||
| /// If the request is successfully dispatched, this function returns a `RequestIdentifier`. | ||
| /// This identifier is useful when managing multiple concurrent streaming requests, | ||
| /// allowing you to match incoming messages to the originating request. | ||
| /// | ||
| /// Once you've configured a selector to receive stream messages (see `select_stream_messages/1`), | ||
| /// the other `StreamMessage` variants will be delivered to the user | ||
| /// | ||
| /// With the exception of timeout errors, all other errors will be delivered via: | ||
| /// `StreamError(RequestIdentifier, HttpError)`. | ||
| /// | ||
| /// If you want to customize the streaming behavior, use `dispatch_stream_request/2` | ||
| /// with a custom `Configuration` instead. | ||
| /// | ||
| pub fn send_stream_request( | ||
| req: Request(String), | ||
| ) -> Result(RequestIdentifier, HttpError) { | ||
| configure() | ||
| |> dispatch_stream_request(req) | ||
| } | ||
|
|
||
|
||
| /// Send a HTTP request of unicode data using the default configuration. | ||
| /// | ||
| /// If you wish to use some other configuration use `dispatch` instead. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,22 @@ | ||
| -module(gleam_httpc_ffi). | ||
| -export([default_user_agent/0, normalise_error/1]). | ||
| -export([default_user_agent/0, normalise_error/1, receive_next_stream_message/1, coerce_stream_message/1]). | ||
|
|
||
| %%==================================================================== | ||
| %% Streaming | ||
| %%==================================================================== | ||
| %% Helper: call stream_next with whatever the handler expects | ||
| receive_next_stream_message(HandlerPid) when is_pid(HandlerPid) -> | ||
| httpc:stream_next(HandlerPid), | ||
| nil. | ||
|
|
||
| coerce_stream_message({http, {ReqId, stream_start, Headers, Pid}}) -> {raw_stream_start, ReqId, Headers, Pid}; | ||
| coerce_stream_message({http, {ReqId, stream, BinBodyPart}}) when is_binary(BinBodyPart) -> {raw_stream_chunk, ReqId, BinBodyPart}; | ||
| coerce_stream_message({http, {ReqId, stream_end, Headers}}) -> {raw_stream_end, ReqId, Headers}; | ||
| coerce_stream_message({http, {ReqId, {error, Reason}}}) -> {raw_stream_error, ReqId, normalise_error(Reason)}. | ||
|
||
|
|
||
| %%==================================================================== | ||
| %% Error normalization | ||
| %%==================================================================== | ||
|
|
||
| normalise_error(Error = {failed_connect, Opts}) -> | ||
| Ipv6 = case lists:keyfind(inet6, 1, Opts) of | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this
stream:{self, once}syntax? I don't recognise itThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's Erlang speak :-) Replaced with `Stream(#(Self, Once))