ExOpenAI supports streaming responses from OpenAI's API, which is particularly useful for chat and completion endpoints. This guide explains how to use streaming effectively in your applications.
ExOpenAI provides two methods for handling streaming responses:
- Callback Function - Pass a function that processes each chunk as it arrives
- Streaming Client - Create a dedicated process to handle the stream
The simplest way to handle streaming is to pass a callback function to the stream_to parameter:
callback = fn
:finish -> IO.puts "Stream finished"
{:data, data} -> IO.puts "Received data: #{inspect(data)}"
{:error, err} -> IO.puts "Error: #{inspect(err)}"
end
ExOpenAI.Completions.create_completion(
"gpt-3.5-turbo-instruct",
"Tell me a story about a robot",
stream: true,
stream_to: callback
)The API call itself returns an async stream reference:
{:ok, stream_ref} =
ExOpenAI.Completions.create_completion(
"gpt-3.5-turbo-instruct",
"Tell me a story about a robot",
stream: true,
stream_to: callback
)The callback function will be called with:
{:data, data}for each chunk of data received{:error, error}if an error occurs:finishwhen the stream completes
For more complex applications, you can create a dedicated process to handle the stream:
- First, create a module that implements the
ExOpenAI.StreamingClientbehaviour:
defmodule MyStreamingClient do
use ExOpenAI.StreamingClient
@impl true
def handle_data(data, state) do
IO.puts("Received data: #{inspect(data)}")
# Process the data chunk
{:noreply, state}
end
@impl true
def handle_error(error, state) do
IO.puts("Error: #{inspect(error)}")
# Handle the error
{:noreply, state}
end
@impl true
def handle_finish(state) do
IO.puts("Stream finished")
# Clean up or finalize processing
{:noreply, state}
end
end- Then, start the client and pass its PID to the API call:
{:ok, pid} = MyStreamingClient.start_link(initial_state)
{:ok, stream_ref} =
ExOpenAI.Chat.create_chat_completion(
messages,
"gpt-4",
stream: true,
stream_to: pid
)Here's a more complete example of using streaming to build a simple chat interface:
defmodule ChatInterface do
use ExOpenAI.StreamingClient
def start_chat(initial_prompt) do
{:ok, pid} = __MODULE__.start_link(%{buffer: "", complete_message: ""})
messages = [
%ExOpenAI.Components.ChatCompletionRequestUserMessage{
role: :user,
content: initial_prompt
}
]
ExOpenAI.Chat.create_chat_completion(
messages,
"gpt-4",
stream: true,
stream_to: pid
)
pid
end
@impl true
def handle_data(data, state) do
# Extract the content from the delta if it exists
content = case data do
%{choices: [%{"delta" => %{"content" => content}}]} when is_binary(content) ->
content
_ ->
""
end
# Update the buffer and print the new content
if content != "" do
IO.write(content)
{:noreply, %{state | buffer: state.buffer <> content, complete_message: state.complete_message <> content}}
else
{:noreply, state}
end
end
@impl true
def handle_error(error, state) do
IO.puts("\nError: #{inspect(error)}")
{:noreply, state}
end
@impl true
def handle_finish(state) do
IO.puts("\n\nChat response complete.")
# The complete message is now available in state.complete_message
{:noreply, state}
end
# Function to get the complete message after streaming is done
def get_complete_message(pid) do
:sys.get_state(pid).complete_message
end
endUsage:
# Start a chat
pid = ChatInterface.start_chat("Tell me about quantum computing")
# After the stream completes, get the full message
complete_response = ChatInterface.get_complete_message(pid)Most OpenAI endpoints that support streaming work in a similar way, but the structure of the streamed data may differ:
ExOpenAI.Chat.create_chat_completion(
messages,
"gpt-4",
stream: true,
stream_to: callback_or_pid
)The streamed data will contain delta updates to the assistant's message.
ExOpenAI.Completions.create_completion(
"gpt-3.5-turbo-instruct",
prompt,
stream: true,
stream_to: callback_or_pid
)The streamed data will contain text fragments.
- Type information for streamed data is not always accurate in the current version
- Streaming API calls return an async reference, while streamed callback/process payloads are atomized maps rather than typed structs
- Streaming increases the total number of tokens used slightly compared to non-streaming requests
- Error handling in streaming contexts requires special attention
- Streaming chunks are returned with atomized keys but without full struct typing; deltas are not accumulated into a final typed struct yet.
callback = fn
:finish -> IO.puts("Stream finished")
{:data, data} -> IO.inspect(data, label: "chunk")
{:error, err} -> IO.inspect(err, label: "error")
end
msgs = [
%ExOpenAI.Components.ChatCompletionRequestUserMessage{role: :user, content: "Hello!"},
%ExOpenAI.Components.ChatCompletionRequestAssistantMessage{role: :assistant, content: "What's up?"},
%ExOpenAI.Components.ChatCompletionRequestUserMessage{role: :user, content: "What is the color of the sky?"}
]
{:ok, _ref} =
ExOpenAI.Chat.create_chat_completion(
msgs,
"gpt-3.5-turbo",
logit_bias: %{"8043" => -100},
stream: true,
stream_to: callback
)Sample output (truncated):
chunk: %{
id: "chatcmpl-…",
model: "gpt-3.5-turbo-0125",
created: 1764240033,
object: "chat.completion.chunk",
choices: [
%{index: 0, logprobs: nil, delta: %{role: "assistant", content: ""}, finish_reason: nil}
],
service_tier: "default",
system_fingerprint: nil
}
chunk: %{
id: "chatcmpl-…",
model: "gpt-3.5-turbo-0125",
choices: [
%{index: 0, delta: %{content: "The color"}, finish_reason: nil}
],
…
}
… (more chunks) …
- Buffer Management: Always maintain a buffer to reconstruct the complete response
- Error Handling: Implement robust error handling in your streaming clients
- Timeouts: Consider implementing timeouts for long-running streams
- Testing: Test your streaming code with both short and very long responses
- State Management: Design your streaming client's state to handle all the information you need