Skip to content

Expose streaming API#50

Draft
adkelley wants to merge 7 commits intogleam-lang:mainfrom
adkelley:main
Draft

Expose streaming API#50
adkelley wants to merge 7 commits intogleam-lang:mainfrom
adkelley:main

Conversation

@adkelley
Copy link

@adkelley adkelley commented Sep 16, 2025

Expose Streaming API

Motivation

Erlang's inets application provides a set of Internet-related services, including the HTTP client httpc. The httpc module gives users the option to stream the body of a 200 or 206 response either to the calling process or directly to a file.

This pull request exposes that option to gleam_httpc while maintaining full backward compatibility. It supports the stated use case stream:{self, once}, (i.e., streaming chunks) represented by the custom type Stream:

type Destination {
  Self
}

type Mode {
  Once
}

type ErlOption {
  ...
  Stream(#(Destination, Mode))
}

None of the other streaming APIs from the Erlang httpc module is supported (e.g., stream:{filename}).

Implementation Details

Several functions that have synchronous equivalents were added to support streaming, including:

  • dispatch_stream_bits(config: Configuration, req: Request(BitArray)) - similar to dispatch_bits function, send a HTTP stream request of binary data
  • dispatch_stream_request(config: Configuration, req: Request(String)) - similar to thedispatch function, send a HTTP stream request of unicode data
  • send_stream_request(req: Request(String) - similar to the send function, send an HTTP stream request of Unicode data using the default configuration.

The function select_stream_messages configures a selector to receive stream messages. Finally, the function receive_next_stream_message(pid: Pid) triggers the next streaming message to be sent to the calling process designated by pid.

Testing

Added two new unit tests, send_stream_request_test, and dispatch_stream_request_test using the postman streamed response api

Documentation

  • Updated README.md with a stream:{self, once} example
  • Added inline documentation to new public functions

Related Issues

Erlang's httpc module supports streaming the body of 200 or 206
responses to the calling process or to a file.  These changes expose
this API.
Copy link
Member

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've left a bunch of notes inline, let me know if any of it is unclear.

Streaming chunks is the stated use case. By limiting support to this API
(i.e, stream:self, once} it removes considerable complexity and enables
deterministic API responses that can be correctly typed in gleam
Copy link
Member

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking a lot nicer, thank you!

It still needs to be changed to use selectors rather than a custom non-composable API for receiving messages. Check out Mug's https://hexdocs.pm/mug/mug.html#receive_next_packet_as_message and https://hexdocs.pm/mug/mug.html#select_tcp_messages for how this can be done.

The new message types are not very clear due to using tuples instead of a descriptive custom type, and the use of charlists makes it difficult for the programmer to use as they have to know how to convert them to strings. Make a custom type for the response messages please, one designed with the Gleam programmer in mind rather than being more faithful to the Erlang APIs are we building on top of 🙏

Thanks again!

Erlang's httpc module supports streaming the body of 200 or 206
responses to the calling process or to a file.  These changes expose the
stream:{self, once} API, the stated use case.

Resolves Issue Expose streaming API gleam-lang#31
@adkelley adkelley force-pushed the main branch 2 times, most recently from 37a29e3 to f7b038a Compare September 22, 2025 23:12
Copy link
Member

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely work! This is def getting where we want it to be. I've left more notes inline, thank you

/// If you wish to only handle stream messages from one process, then use one
/// process per asychronous HTTP request.
///
pub fn initialize_stream_selector() -> process.Selector(StreamMessage) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same design as found in the other packages please, so take the selector as an argument, take a mapping function as an argument, and use the naming convention they use. The TCP library is a good reference here.

If you cannot use an existing selector then you would not be able to use it with code that already has a selector (so most actor based code).

if you do not have a mapping function then you can't produce selectors of different types, which means you cannot easily select other types of messages with the selector.

And the naming convention is just to be consistent and to make the API easy to approach for people who have learnt about how to use selectors already.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. Revised.

Erlang's httpc module supports streaming the body of 200 or 206
responses to the calling process or to a file.  These changes expose the
stream:{self, once} API, the stated use case.

Resolves Issue Expose streaming API gleam-lang#31
@adkelley
Copy link
Author

adkelley commented Oct 7, 2025

Thanks again for your previous comments and suggestions. Is there anything else you'd like to see before accepting this pull request?

Copy link
Member

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Nearly there, left some API related notes inline

VerifyNone
}

pub type HttpSocket
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - removed.

RawStreamChunk(RequestIdentifier, BitArray)
RawStreamEnd(RequestIdentifier, List(#(Charlist, Charlist)))
RawStreamError(RequestIdentifier, HttpError)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it is an implementation detail and should not be in part of the public interface?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's an implementation detail. I've refactored the code to make it private, and, as a consequence, I was able to simplify the configuration of the selector that receives stream messages (See pub fn select_stream_messages() -> process.Selector(StreamMessage))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we construct values of this type and then immediately convert to another type? Why not construct that final type and skip this intermediate one?

StreamChunk(RequestIdentifier, BitArray)
StreamEnd(RequestIdentifier, List(#(String, String)))
StreamError(RequestIdentifier, HttpError)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document this type and add labels please 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/// Send a HTTP stream request of binary data
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you finish this documentation please 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/// Send a HTTP request of unicode data.
/// Send a synchronus HTTP request of unicode data.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this please 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

configure()
|> dispatch_stream_request(req)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have just the dispatch function 🙏

Copy link
Author

@adkelley adkelley Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more about this? Are you suggesting that I remove pub fn send_stream_request(req: Request(String)) -> Result(RequestIdentifier, HttpError), leaving dispatch_stream_request whenever streaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please

///
/// With the exception of timeout errors, all other errors will be delivered via:
/// `StreamError(RequestIdentifier, HttpError)`.
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a complex API, so folks will need examples to understand how to use it. Could you add these please 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a thorough example to the README

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added a simpler example to the documentation above dispatch_stream_request and send_stream_request functions.

process.selector_receive(selector, 1000)
assert request_id_ == request_id

let _nil = httpc.receive_next_stream_message(pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the let nil_ = prefixes please, including from the documentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@lpil lpil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry! I wrote some review comments and then didn't press submit for some reason. Sorry about that

README.md Outdated

## Http streaming requests

`httpc` supports `stream:{self, once}` mode, which is a **pull-based** approach for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this stream:{self, once} syntax? I don't recognise it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's Erlang speak :-) Replaced with `Stream(#(Self, Once))

README.md Outdated

/// Receive a streamed response from Postman Echo. The number of
/// stream chunks we receive is 5, as we specfied by the endpoint
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the reference to postman please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

README.md Outdated
// Configure the selector
let selector = httpc.select_stream_messages()
let assert Ok(chunks) =
loop(request_id, selector, process.self(), bit_array.from_string(""))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use raw looping please, we don't want people to do that when they should be using an actor

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised to use the Actor model. This required a good deal of thought, but it certainly helped me to better understand your approach to OTP.

let selector = httpc.select_stream_messages()
let assert Ok(_request_id) = httpc.send_stream_request(req)
let assert Ok(httpc.StreamError(_request_id, httpc.SocketClosedRemotely)) =
process.selector_receive(selector, 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assert process.selector_receive(selector, 1000) == Ok(httpc.StreamError(request_id, httpc.SocketClosedRemotely)) please 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

coerce_stream_message({http, {ReqId, stream_start, Headers, Pid}}) -> {raw_stream_start, ReqId, Headers, Pid};
coerce_stream_message({http, {ReqId, stream, BinBodyPart}}) when is_binary(BinBodyPart) -> {raw_stream_chunk, ReqId, BinBodyPart};
coerce_stream_message({http, {ReqId, stream_end, Headers}}) -> {raw_stream_end, ReqId, Headers};
coerce_stream_message({http, {ReqId, {error, Reason}}}) -> {raw_stream_error, ReqId, normalise_error(Reason)}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap these long lines before 80 columns please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

///
pub type StreamMessage {
/// Sent exactly once when the server response begins. The returned `pid`
/// identifies the process and must be passed to `httpc:stream_next`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Erlang syntax or explaining Erlang APIs in Gleam documentation please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised to refer to the Gleam function

let request = request.map(request, bit_array.from_string)
use request_id <- result.try(async_dispatch_bits(config, request))
Ok(request_id)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's always work with bits and not have a string specialised version, seeing as it is not possible to ensure that the response is a string, as the other string versions do.

)
/// Sent for every chunk of response data that the worker emits. Each chunk
/// must be explicitly requested by calling `httpc:stream_next/1` with the pid
/// supplied in `StreamStart`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the documentation of Erlang internals please. This is documentation for users, not maintainers.

pub type StreamMessage {
/// Sent exactly once when the server response begins. The returned `pid`
/// identifies the process and must be passed to `httpc:stream_next`
/// whenever you are ready for the next chunk.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this process? Is it some internal detail of Erlang's HTTPC? Is it a process the user creates? What happens if we pass a different pid to the stream function?

Copy link
Author

@adkelley adkelley Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! According to the API documentation, you're supposed to use this pid in stream_next_message. However, I tested a different pid (process.self()) in an example, and it still worked correctly. I suggest we stick with the documented behavior (See https://www.erlang.org/doc/apps/inets/httpc.html#request/5)

RawStreamChunk(RequestIdentifier, BitArray)
RawStreamEnd(RequestIdentifier, List(#(Charlist, Charlist)))
RawStreamError(RequestIdentifier, HttpError)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we construct values of this type and then immediately convert to another type? Why not construct that final type and skip this intermediate one?

@lpil lpil marked this pull request as draft December 11, 2025 18:51
@adkelley
Copy link
Author

adkelley commented Dec 17, 2025

In the latest version, I leave it to the user to either work with RawStreamMessages or convert them to StreamMessages. I've provided a convenience function to perform this transformation. The only difference is Charlist vs. String headers, which are easier to work with in Gleam. If you feel we should always work with StreamMessage then I'll change it.

* Updated README with a streaming example

* Added error case to streaming tests

* Removed send_stream from httpc and improved function documentation
@CrowdHailer
Copy link
Contributor

I think it's possible for a non streaming request to be returned even when streaming.
When I make a request to ollama that results in a 400 I get a match error.

I added this line to the end of coerce_stream_message

coerce_stream_message(Any) ->io:format("X is: ~p~n", [Any]). 

And get this output.

X is: {http,{#Ref<0.1590068022.2841640966.62230>,
             {{"HTTP/1.1",400,"Bad Request"},
              [{"connection","close"},
               {"date","Wed, 18 Feb 2026 19:40:44 GMT"},
               {"content-length","136"},
               {"content-type","application/json; charset=utf-8"}],
              <<"{\"error\":\"json: cannot unmarshal string into Go struct field ToolFunction.tools.function.parameters of type api.ToolFunctionParameters\"}">>}}}

@Munksgaard
Copy link

What's the status on this PR? Streaming seems like something that'd be nice to have :-)

@jtdowney
Copy link

@adkelley I noticed this PR is still marked as a draft

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants