Skip to content

Commit f7a9b8d

Browse files
authored
fix: Add pluggable stream parser for Gemini JSON array streaming (#26)
Gemini's streamGenerateContent endpoint returns a JSON array by default, not SSE. Instead of forcing SSE with alt=sse, this adds a proper JSON array stream parser and makes HTTP.stream's buffer parser pluggable. - Add Nous.Providers.HTTP.JSONArrayParser for JSON array responses - Add :stream_parser option to HTTP.stream/4 (defaults to SSE) - Gemini provider uses JSONArrayParser for native format support - 18 new tests for the JSON array parser Bumps version to 0.12.2.
1 parent c58ae9d commit f7a9b8d

File tree

6 files changed

+338
-7
lines changed

6 files changed

+338
-7
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [0.12.2] - 2026-03-04
6+
7+
### Fixed
8+
9+
- **Gemini streaming**: Fixed streaming responses returning 0 events. The Gemini `streamGenerateContent` endpoint returns a JSON array (`application/json`) by default, not Server-Sent Events. Instead of forcing SSE via `alt=sse` query parameter, added a pluggable stream parser to `Nous.Providers.HTTP`.
10+
11+
### Added
12+
13+
- `Nous.Providers.HTTP.JSONArrayParser` — stream buffer parser for JSON array responses. Extracts complete JSON objects from a streaming `[{...},{...},...]` response by tracking `{}` nesting depth while respecting string literals and escape sequences.
14+
- `:stream_parser` option on `HTTP.stream/4` — accepts any module implementing `parse_buffer/1` with the same `{events, remaining_buffer}` contract as SSE parsing. Defaults to the existing SSE parser. Enables any provider with a non-SSE streaming format to plug in a custom parser.
15+
516
## [0.12.0] - 2026-02-28
617

718
### Added

lib/nous/providers/gemini.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ defmodule Nous.Providers.Gemini do
110110
# Remove model from params (it's in the URL)
111111
body = params |> Map.delete("model") |> Map.delete(:model)
112112

113-
HTTP.stream(url, body, headers, timeout: timeout, finch_name: finch_name)
113+
HTTP.stream(url, body, headers,
114+
timeout: timeout,
115+
finch_name: finch_name,
116+
stream_parser: Nous.Providers.HTTP.JSONArrayParser
117+
)
114118
end
115119

116120
# Build URL with model and API key in query params

lib/nous/providers/http.ex

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ defmodule Nous.Providers.HTTP do
9595
## Options
9696
* `:timeout` - Request timeout in ms (default: 60_000)
9797
* `:finch_name` - Finch pool name (default: Nous.Finch)
98+
* `:stream_parser` - Module for parsing the stream buffer (default: SSE parsing).
99+
Must implement `parse_buffer/1` returning `{events, remaining_buffer}`.
100+
See `Nous.Providers.HTTP.JSONArrayParser` for an example.
98101
99102
## Error Handling
100103
The stream will emit `{:stream_error, reason}` on errors and then halt.
@@ -106,6 +109,7 @@ defmodule Nous.Providers.HTTP do
106109
when is_binary(url) and is_map(body) and is_list(headers) do
107110
timeout = Keyword.get(opts, :timeout, @default_timeout)
108111
finch_name = Keyword.get(opts, :finch_name, Nous.Finch)
112+
stream_parser = Keyword.get(opts, :stream_parser)
109113

110114
case Jason.encode(body) do
111115
{:ok, json_body} ->
@@ -114,7 +118,9 @@ defmodule Nous.Providers.HTTP do
114118

115119
stream =
116120
Stream.resource(
117-
fn -> start_streaming(url, headers, json_body, finch_name, timeout) end,
121+
fn ->
122+
start_streaming(url, headers, json_body, finch_name, timeout, stream_parser)
123+
end,
118124
&next_chunk/1,
119125
&cleanup/1
120126
)
@@ -356,7 +362,7 @@ defmodule Nous.Providers.HTTP do
356362
end
357363

358364
# Start streaming - spawn a process to handle Finch.stream
359-
defp start_streaming(url, headers, body, finch_name, timeout) do
365+
defp start_streaming(url, headers, body, finch_name, timeout, stream_parser) do
360366
parent = self()
361367

362368
pid =
@@ -408,7 +414,8 @@ defmodule Nous.Providers.HTTP do
408414
done: false,
409415
status: nil,
410416
timeout: timeout,
411-
error: nil
417+
error: nil,
418+
stream_parser: stream_parser
412419
}
413420
end
414421

@@ -482,7 +489,7 @@ defmodule Nous.Providers.HTTP do
482489
Logger.error("SSE buffer overflow, terminating stream")
483490
{[{:stream_error, %{reason: :buffer_overflow}}], %{state | done: true}}
484491
else
485-
{events, remaining_buffer} = parse_sse_buffer(new_buffer)
492+
{events, remaining_buffer} = parse_stream_buffer(new_buffer, state.stream_parser)
486493

487494
# Filter out parse errors if we want to be lenient
488495
{valid_events, errors} =
@@ -505,7 +512,7 @@ defmodule Nous.Providers.HTTP do
505512

506513
{:sse, :done, :ok} ->
507514
# Flush any remaining buffer
508-
{events, _} = parse_sse_buffer(state.buffer <> "\n\n")
515+
{events, _} = flush_stream_buffer(state.buffer, state.stream_parser)
509516

510517
final_events =
511518
Enum.reject(events, fn
@@ -530,6 +537,16 @@ defmodule Nous.Providers.HTTP do
530537
end
531538
end
532539

540+
# Parse stream buffer using the configured parser (default: SSE)
541+
defp parse_stream_buffer(buffer, nil), do: parse_sse_buffer(buffer)
542+
defp parse_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer)
543+
544+
# Flush remaining buffer at end of stream
545+
# SSE needs a trailing \n\n to force the last event through;
546+
# custom parsers just re-parse the remaining buffer as-is.
547+
defp flush_stream_buffer(buffer, nil), do: parse_sse_buffer(buffer <> "\n\n")
548+
defp flush_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer)
549+
533550
# Cleanup when stream is done
534551
defp cleanup(state) do
535552
if state[:pid] && Process.alive?(state.pid) do
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
defmodule Nous.Providers.HTTP.JSONArrayParser do
2+
@moduledoc """
3+
Stream parser for JSON array responses.
4+
5+
Parses streaming HTTP responses where the body is a JSON array of objects:
6+
7+
[{"candidates":[...]},{"candidates":[...]},...]
8+
9+
Used by providers (like Gemini) that stream responses as a JSON array
10+
rather than Server-Sent Events. Has the same interface as
11+
`Nous.Providers.HTTP.parse_sse_buffer/1` so it can be used as a
12+
drop-in `:stream_parser` for `HTTP.stream/4`.
13+
14+
## How it works
15+
16+
Chunks arrive at arbitrary byte boundaries. The parser accumulates them
17+
in a buffer, skips array-level syntax (`[`, `]`, `,`, whitespace), and
18+
extracts complete top-level JSON objects by tracking `{}` nesting depth
19+
while respecting string literals and escape sequences.
20+
"""
21+
22+
require Logger
23+
24+
@doc """
25+
Parse a buffer containing chunks of a JSON array into individual events.
26+
27+
Returns `{events, remaining_buffer}` where events is a list of parsed
28+
JSON maps (same contract as `HTTP.parse_sse_buffer/1`).
29+
30+
## Examples
31+
32+
iex> parse_buffer(~s|[{"text":"hi"},{"text":"there"}]|)
33+
{[%{"text" => "hi"}, %{"text" => "there"}], ""}
34+
35+
iex> parse_buffer(~s|[{"text":"hi"},{"tex|)
36+
{[%{"text" => "hi"}], ~s|{"tex|}
37+
38+
iex> parse_buffer("")
39+
{[], ""}
40+
"""
41+
@spec parse_buffer(String.t()) :: {list(), String.t()}
42+
def parse_buffer(buffer) when is_binary(buffer) do
43+
extract_objects(buffer, [])
44+
end
45+
46+
def parse_buffer(_), do: {[], ""}
47+
48+
# Recursively extract complete JSON objects from the buffer
49+
defp extract_objects(buffer, acc) do
50+
trimmed = skip_array_syntax(buffer)
51+
52+
case extract_next_object(trimmed) do
53+
{:ok, json_str, rest} ->
54+
case Jason.decode(json_str) do
55+
{:ok, parsed} ->
56+
extract_objects(rest, [parsed | acc])
57+
58+
{:error, error} ->
59+
Logger.debug("JSON array parser: failed to decode object: #{inspect(error)}")
60+
# Don't consume more — the buffer might just need more data
61+
{Enum.reverse(acc), trimmed}
62+
end
63+
64+
:incomplete ->
65+
{Enum.reverse(acc), trimmed}
66+
end
67+
end
68+
69+
# Skip array-level syntax: [ ] , and whitespace between objects
70+
defp skip_array_syntax(<<c, rest::binary>>) when c in ~c|[,] \t\n\r|,
71+
do: skip_array_syntax(rest)
72+
73+
defp skip_array_syntax(buffer), do: buffer
74+
75+
# Extract the next complete top-level JSON object from the buffer.
76+
# Only starts extraction when buffer begins with `{`.
77+
defp extract_next_object(<<"{", _::binary>> = buffer) do
78+
case find_object_end(buffer, 0, 0, false) do
79+
{:ok, end_pos} ->
80+
<<json::binary-size(^end_pos), rest::binary>> = buffer
81+
{:ok, json, rest}
82+
83+
:incomplete ->
84+
:incomplete
85+
end
86+
end
87+
88+
defp extract_next_object(_), do: :incomplete
89+
90+
# Walk the buffer character by character tracking {} depth.
91+
# Respects JSON string boundaries and escape sequences.
92+
#
93+
# Returns {:ok, end_position} when a complete object is found,
94+
# or :incomplete if the buffer ends mid-object.
95+
96+
# Buffer exhausted before object closed
97+
defp find_object_end(<<>>, _pos, _depth, _in_string), do: :incomplete
98+
99+
# Escaped character inside a string — skip both bytes
100+
defp find_object_end(<<"\\", _, rest::binary>>, pos, depth, true) do
101+
find_object_end(rest, pos + 2, depth, true)
102+
end
103+
104+
# Quote toggles string state
105+
defp find_object_end(<<"\"", rest::binary>>, pos, depth, in_string) do
106+
find_object_end(rest, pos + 1, depth, not in_string)
107+
end
108+
109+
# Open brace outside string — increase depth
110+
defp find_object_end(<<"{", rest::binary>>, pos, depth, false) do
111+
find_object_end(rest, pos + 1, depth + 1, false)
112+
end
113+
114+
# Close brace outside string — decrease depth, check if object complete
115+
defp find_object_end(<<"}", rest::binary>>, pos, depth, false) do
116+
case depth - 1 do
117+
0 -> {:ok, pos + 1}
118+
new_depth -> find_object_end(rest, pos + 1, new_depth, false)
119+
end
120+
end
121+
122+
# Any other character — advance position
123+
defp find_object_end(<<_, rest::binary>>, pos, depth, in_string) do
124+
find_object_end(rest, pos + 1, depth, in_string)
125+
end
126+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Nous.MixProject do
22
use Mix.Project
33

4-
@version "0.12.1"
4+
@version "0.12.2"
55
@source_url "https://github.com/nyo16/nous"
66

77
def project do

0 commit comments

Comments
 (0)