feat(llm): add real-time token streaming through provider chain#2872
feat(llm): add real-time token streaming through provider chain#2872rizgan wants to merge 7 commits intonearai:stagingfrom
Conversation
Add streaming capability to the LLM provider system so clients receive tokens progressively rather than waiting for the full response. ## New provider OpenAiCompatStreamingProvider (src/llm/openai_compat_stream.rs): - Wraps any OpenAI-compatible backend (OpenRouter, Groq, etc.) - Sends POST with stream: true and stream_options.include_usage: true - Parses SSE data: lines, accumulates delta content and tool_calls - Implements both complete_stream and complete_with_tools_stream - Falls back gracefully to the inner provider for non-streaming paths - Used by the registry factory for all openai_compat backends ## Trait changes (src/llm/provider.rs) Two new default methods on LlmProvider: - complete_stream(request, on_chunk) -- default: single-chunk fallback - complete_with_tools_stream(request, on_chunk) -- same fallback Default impls preserve backward compatibility for all existing providers. ## Provider chain delegation All wrapper providers now forward streaming calls instead of falling back to the non-streaming default: - RetryProvider -- delegates directly (streams are not retried) - SmartRoutingProvider -- delegates to primary - FailoverProvider -- delegates to last-used provider - CircuitBreakerProvider-- with full check_allowed/record_success/failure - CachedProvider -- bypasses cache (streaming takes priority) - RecordingLlm -- bypasses recording (chunks are not replayable) ## Wiring (src/llm/reasoning.rs, src/agent/dispatcher.rs) ReasoningContext gains an optional chunk_sender field. ChatDelegate::call_llm() spawns a forwarder task that reads from the channel and calls channels.send_status(StreamChunk), which the web gateway broadcasts as SSE stream_chunk events to connected clients.
|
This PR fixes a silent streaming regression where all LLM responses were delivered Root cause: The provider chain wrappers (RetryProvider → SmartRoutingProvider → What this PR does:
Tested with OpenRouter ( |
There was a problem hiding this comment.
Code Review
This pull request introduces real-time token streaming for LLM providers, specifically targeting OpenAI-compatible endpoints. It adds complete_stream and complete_with_tools_stream methods to the LlmProvider trait and implements them across various provider wrappers, including circuit breakers, failover, and retry mechanisms. A new OpenAiCompatStreamingProvider is added to handle the SSE delta protocol. The ReasoningContext now supports a chunk_sender to facilitate token delivery to the client. Feedback focuses on improving system stability by using a bounded channel for token streaming to prevent memory exhaustion and adding a request timeout to the HTTP client to avoid hanging tasks.
| let (chunk_tx, mut chunk_rx) = | ||
| tokio::sync::mpsc::unbounded_channel::<String>(); |
There was a problem hiding this comment.
Using an unbounded_channel for token streaming can lead to unbounded memory growth if the LLM produces tokens faster than the channel consumer (the status update task) can process them. This is particularly risky if send_status involves network I/O or database operations that might stall. Consider using a bounded_channel with a reasonable capacity (e.g., 100) to provide backpressure.
| let (chunk_tx, mut chunk_rx) = | |
| tokio::sync::mpsc::unbounded_channel::<String>(); | |
| let (chunk_tx, mut chunk_rx) = | |
| tokio::sync::mpsc::channel::<String>(100); |
| let client = reqwest::Client::builder() | ||
| .connect_timeout(Duration::from_secs(30)) | ||
| .build() |
There was a problem hiding this comment.
The reqwest::Client is configured with a connect_timeout but lacks a total request timeout or a read timeout. While the comment mentions avoiding cutting off long streams, an entirely absent timeout can lead to tasks hanging indefinitely if the upstream server maintains an open connection but stops sending data. Consider adding a timeout or using a read_timeout to ensure the stream eventually terminates if stalled.
| let client = reqwest::Client::builder() | |
| .connect_timeout(Duration::from_secs(30)) | |
| .build() | |
| let client = reqwest::Client::builder() | |
| .connect_timeout(Duration::from_secs(30)) | |
| .timeout(Duration::from_secs(600)) | |
| .build() |
There was a problem hiding this comment.
Pull request overview
Adds real-time token streaming to the LLM provider abstraction and wires it through the provider chain so clients can receive incremental output via SSE-style chunk forwarding.
Changes:
- Extends
LlmProviderwith streaming methods (defaulting to single-chunk fallback for backward compatibility). - Delegates streaming through wrapper providers (retry/failover/circuit breaker/cache/recording/smart routing).
- Introduces an OpenAI-compatible streaming wrapper and wires chunk forwarding through
ReasoningContext→ dispatcher channel updates.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/llm/provider.rs | Adds default complete_stream / complete_with_tools_stream methods to the provider trait. |
| src/llm/openai_compat_stream.rs | New OpenAI-compatible SSE streaming implementation that accumulates deltas + tool_calls. |
| src/llm/mod.rs | Wraps registry OpenAI-compatible providers with the new streaming provider. |
| src/llm/reasoning.rs | Emits streaming chunks to an optional sender during LLM calls. |
| src/agent/dispatcher.rs | Spawns a forwarder task that relays chunks to channel status updates. |
| src/llm/smart_routing.rs | Forwards streaming calls to primary provider. |
| src/llm/retry.rs | Forwards streaming calls to inner provider (no retries for streams). |
| src/llm/failover.rs | Forwards streaming calls to last-used provider. |
| src/llm/circuit_breaker.rs | Adds streaming support with circuit breaker accounting. |
| src/llm/response_cache.rs | Bypasses cache for streaming calls. |
| src/llm/recording.rs | Bypasses recording for streaming calls. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Wire up real-time token streaming to the channel layer. | ||
| { | ||
| let (chunk_tx, mut chunk_rx) = | ||
| tokio::sync::mpsc::unbounded_channel::<String>(); | ||
| let channels = Arc::clone(&self.agent.channels); | ||
| let channel_name = self.message.channel.clone(); | ||
| let metadata = self.message.metadata.clone(); | ||
| tokio::spawn(async move { | ||
| while let Some(chunk) = chunk_rx.recv().await { | ||
| let _ = channels | ||
| .send_status( | ||
| &channel_name, | ||
| crate::channels::StatusUpdate::StreamChunk(chunk), | ||
| &metadata, | ||
| ) | ||
| .await; | ||
| } | ||
| }); | ||
| reason_ctx.chunk_sender = Some(chunk_tx); | ||
| } |
There was a problem hiding this comment.
The streaming forwarder uses an unbounded mpsc channel and awaits send_status() for every chunk. If the provider streams faster than the channel layer can deliver, the unbounded queue can grow without bound and increase memory usage. Prefer a bounded channel with try_send (dropping/coalescing when full) or another backpressure strategy suitable for token streaming.
| /// POST `body` (with `"stream": true` already set) to the completions | ||
| /// endpoint, parse the SSE delta stream, and return the accumulated result. | ||
| async fn stream_request( | ||
| &self, | ||
| body: serde_json::Value, | ||
| on_chunk: &mut (dyn FnMut(String) + Send), | ||
| ) -> Result<OaiStreamResult, LlmError> { | ||
| let url = self.completions_url(); | ||
|
|
||
| let mut builder = self | ||
| .client | ||
| .post(&url) | ||
| .header("Authorization", format!("Bearer {}", self.api_key)) | ||
| .header("Content-Type", "application/json") | ||
| .header("Accept", "text/event-stream"); | ||
|
|
||
| for (k, v) in &self.extra_headers { | ||
| builder = builder.header(k.as_str(), v.as_str()); | ||
| } | ||
|
|
||
| let response = builder.json(&body).send().await.map_err(|e| { | ||
| LlmError::RequestFailed { | ||
| provider: "openai_compat".to_string(), | ||
| reason: e.to_string(), | ||
| } | ||
| })?; | ||
|
|
||
| let status = response.status(); | ||
| if !status.is_success() { | ||
| let code = status.as_u16(); | ||
| let retry_after = Some(crate::llm::retry::parse_retry_after( | ||
| response.headers().get("retry-after"), | ||
| )); | ||
| let text = response.text().await.unwrap_or_default(); | ||
| let truncated = crate::agent::truncate_for_preview(&text, 512); | ||
| return Err(match code { | ||
| 401 | 403 => LlmError::AuthFailed { | ||
| provider: "openai_compat".to_string(), | ||
| }, | ||
| 429 => LlmError::RateLimited { | ||
| provider: "openai_compat".to_string(), | ||
| retry_after, | ||
| }, | ||
| _ => LlmError::RequestFailed { | ||
| provider: "openai_compat".to_string(), | ||
| reason: format!("HTTP {}: {}", status, truncated), | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| let mut result = OaiStreamResult::default(); | ||
| // BTreeMap keyed by tool_call index — OpenAI streams tool_call arguments | ||
| // as incremental string deltas that must be concatenated in order. | ||
| let mut tool_acc: BTreeMap<u32, PartialTool> = BTreeMap::new(); | ||
|
|
||
| let stream = response | ||
| .bytes_stream() | ||
| .map(|chunk| chunk.map_err(|e| e.to_string())); | ||
| let mut event_stream = stream.eventsource(); | ||
|
|
||
| while let Some(event) = event_stream.next().await { | ||
| let event = event.map_err(|e| LlmError::RequestFailed { | ||
| provider: "openai_compat".to_string(), | ||
| reason: format!("SSE stream error: {}", e), | ||
| })?; | ||
|
|
||
| let data = event.data.trim(); | ||
| if data == "[DONE]" { | ||
| break; | ||
| } | ||
| if data.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| let parsed: serde_json::Value = match serde_json::from_str(data) { | ||
| Ok(v) => v, | ||
| Err(_) => continue, | ||
| }; | ||
|
|
||
| if let Some(choices) = parsed.get("choices").and_then(|c| c.as_array()) | ||
| && let Some(choice) = choices.first() | ||
| { | ||
| if let Some(fr) = choice.get("finish_reason").and_then(|v| v.as_str()) { | ||
| result.finish_reason = match fr { | ||
| "stop" => FinishReason::Stop, | ||
| "length" => FinishReason::Length, | ||
| "tool_calls" => FinishReason::ToolUse, | ||
| "content_filter" => FinishReason::ContentFilter, | ||
| _ => result.finish_reason, | ||
| }; | ||
| } | ||
|
|
||
| if let Some(delta) = choice.get("delta") { | ||
| if let Some(content) = delta.get("content").and_then(|c| c.as_str()) | ||
| && !content.is_empty() | ||
| { | ||
| result.content.push_str(content); | ||
| on_chunk(content.to_string()); | ||
| } | ||
|
|
||
| if let Some(tcs) = delta.get("tool_calls").and_then(|tc| tc.as_array()) { | ||
| for tc in tcs { | ||
| let idx = tc | ||
| .get("index") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0) as u32; | ||
| let entry = tool_acc.entry(idx).or_default(); | ||
| if let Some(id) = tc.get("id").and_then(|v| v.as_str()) | ||
| && !id.is_empty() | ||
| { | ||
| entry.id = id.to_string(); | ||
| } | ||
| if let Some(func) = tc.get("function") { | ||
| if let Some(name) = | ||
| func.get("name").and_then(|v| v.as_str()) | ||
| && !name.is_empty() | ||
| { | ||
| entry.name = name.to_string(); | ||
| } | ||
| if let Some(args) = | ||
| func.get("arguments").and_then(|v| v.as_str()) | ||
| { | ||
| entry.arguments.push_str(args); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Usage is typically in the last chunk when stream_options.include_usage is set. | ||
| if let Some(usage) = parsed.get("usage") { | ||
| result.input_tokens = saturate_u32( | ||
| usage | ||
| .get("prompt_tokens") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0), | ||
| ); | ||
| result.output_tokens = saturate_u32( | ||
| usage | ||
| .get("completion_tokens") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0), | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| result.tool_calls = tool_acc | ||
| .into_values() | ||
| .filter(|p| !p.name.is_empty()) | ||
| .map(|p| { | ||
| let arguments = serde_json::from_str::<serde_json::Value>(&p.arguments) | ||
| .unwrap_or_else(|_| serde_json::Value::Object(Default::default())); | ||
| ToolCall { | ||
| id: p.id, | ||
| name: p.name, | ||
| arguments, | ||
| reasoning: None, | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| Ok(result) | ||
| } |
There was a problem hiding this comment.
OpenAiCompatStreamingProvider::stream_request contains substantial SSE parsing logic (delta accumulation, finish_reason mapping, tool_call reconstruction, usage extraction) but there are no unit tests covering common and edge-case streams (multiple tool_calls, missing usage, malformed JSON events, etc.). Other streaming parsers in this repo include targeted tests (e.g. src/llm/openai_codex_provider.rs). Adding focused tests here would help prevent regressions across OpenAI-compatible backends.
| // Use only a connect-timeout so that long streams are not cut off. | ||
| let client = reqwest::Client::builder() | ||
| .connect_timeout(Duration::from_secs(30)) | ||
| .build() | ||
| .expect("failed to build reqwest::Client for openai_compat streaming"); | ||
| Self { |
There was a problem hiding this comment.
OpenAiCompatStreamingProvider::new() uses .expect(...) on reqwest::Client::builder().build(), which can panic the process at runtime (e.g., TLS backend misconfig). Consider returning Result<Self, LlmError> (or building the client outside and passing it in) so provider construction failures are surfaced as normal errors instead of crashing.
| config.model.clone(), | ||
| extra_headers_vec, | ||
| unsupported, | ||
| ); |
There was a problem hiding this comment.
The streaming wrapper is constructed with config.base_url.clone(), but the rig-core client uses normalize_openai_base_url(&config.base_url) to append /v1 for bare host URLs. Because OpenAiCompatStreamingProvider::completions_url() assumes the base URL already includes the API version prefix, streaming requests can hit the wrong path (e.g. http://localhost:8080/chat/completions instead of /v1/chat/completions). Pass the normalized base URL into the streaming provider (or normalize inside completions_url()) so streaming and non-streaming calls target the same endpoint.
| config.model.clone(), | ||
| extra_headers_vec, | ||
| unsupported, | ||
| ); |
There was a problem hiding this comment.
extra_headers are validated and invalid names/values are skipped when building the rig-core OpenAI-compatible client, but the streaming wrapper rebuilds headers directly from config.extra_headers without validation. This can make streaming fail in cases where non-streaming works (or reintroduce headers that were intentionally skipped). Reuse the already-validated HeaderMap (convert it to pairs) or apply the same validation/skipping logic before storing extra_headers for streaming.
| let model = req | ||
| .take_model_override() | ||
| .unwrap_or_else(|| self.model_name.clone()); | ||
| let messages = messages_to_json(&req.messages); | ||
|
|
There was a problem hiding this comment.
complete_stream bypasses the inner RigAdapter and serializes req.messages directly, but it does not run sanitize_tool_messages like RigAdapter::complete() does (src/llm/rig_adapter.rs:1031). This can reintroduce OpenAI 400s due to orphaned tool_result messages. Sanitize (and apply any other message normalization you rely on) before calling messages_to_json().
| let model = req | ||
| .take_model_override() | ||
| .unwrap_or_else(|| self.model_name.clone()); | ||
| let messages = messages_to_json(&req.messages); |
There was a problem hiding this comment.
complete_with_tools_stream bypasses the inner RigAdapter but does not call sanitize_tool_messages on req.messages (which RigAdapter::complete_with_tools() does at src/llm/rig_adapter.rs:1088). This can cause upstream request failures if the history contains orphaned tool_result messages. Sanitize messages before converting them to OpenAI JSON.
| let model = req | |
| .take_model_override() | |
| .unwrap_or_else(|| self.model_name.clone()); | |
| let messages = messages_to_json(&req.messages); | |
| fn sanitize_openai_tool_messages(messages: serde_json::Value) -> serde_json::Value { | |
| let Some(items) = messages.as_array() else { | |
| return messages; | |
| }; | |
| let mut pending_tool_call_ids = HashSet::new(); | |
| let mut sanitized = Vec::with_capacity(items.len()); | |
| for message in items { | |
| let Some(obj) = message.as_object() else { | |
| sanitized.push(message.clone()); | |
| continue; | |
| }; | |
| match obj.get("role").and_then(|role| role.as_str()) { | |
| Some("assistant") => { | |
| if let Some(tool_calls) = obj.get("tool_calls").and_then(|tc| tc.as_array()) | |
| { | |
| for tool_call in tool_calls { | |
| if let Some(id) = tool_call | |
| .as_object() | |
| .and_then(|tc| tc.get("id")) | |
| .and_then(|id| id.as_str()) | |
| { | |
| pending_tool_call_ids.insert(id.to_string()); | |
| } | |
| } | |
| } | |
| sanitized.push(message.clone()); | |
| } | |
| Some("tool") => { | |
| let tool_call_id = obj.get("tool_call_id").and_then(|id| id.as_str()); | |
| if let Some(tool_call_id) = tool_call_id | |
| && pending_tool_call_ids.remove(tool_call_id) | |
| { | |
| sanitized.push(message.clone()); | |
| } | |
| } | |
| _ => sanitized.push(message.clone()), | |
| } | |
| } | |
| serde_json::Value::Array(sanitized) | |
| } | |
| let model = req | |
| .take_model_override() | |
| .unwrap_or_else(|| self.model_name.clone()); | |
| let messages = sanitize_openai_tool_messages(messages_to_json(&req.messages)); |
| result.tool_calls = tool_acc | ||
| .into_values() | ||
| .filter(|p| !p.name.is_empty()) | ||
| .map(|p| { | ||
| let arguments = serde_json::from_str::<serde_json::Value>(&p.arguments) | ||
| .unwrap_or_else(|_| serde_json::Value::Object(Default::default())); | ||
| ToolCall { | ||
| id: p.id, | ||
| name: p.name, | ||
| arguments, | ||
| reasoning: None, | ||
| } | ||
| }) | ||
| .collect(); |
There was a problem hiding this comment.
When a streamed tool_call arguments string fails JSON parsing, the code currently falls back to an empty object ({}). This can silently change semantics by invoking a tool with missing parameters instead of preserving the raw payload or surfacing an invalid-response error. Other providers preserve the raw string on parse failure (e.g. src/llm/openai_codex_provider.rs:671-674). Consider returning LlmError::InvalidResponse or storing arguments as a serde_json::Value::String when parsing fails.
- dispatcher+reasoning: replace unbounded channel with bounded(256) + try_send to drop chunks on overflow instead of growing memory without bound
- mod.rs: normalize base_url via normalize_openai_base_url before passing to streaming provider so the streaming path hits the same endpoint as rig-core
- mod.rs: reuse validated HeaderMap to build extra_headers for streaming provider (skips invalid names/values instead of passing raw)
- openai_compat_stream: OpenAiCompatStreamingProvider::new now returns Result instead of panicking via expect() on reqwest build failure
- openai_compat_stream: add total request timeout (600s) in addition to connect_timeout so hung upstream cannot leak tasks
- openai_compat_stream: call sanitize_tool_messages before messages_to_json in both streaming methods (matches RigAdapter behavior)
- openai_compat_stream: preserve raw tool_call argument string (as JSON string) on parse failure instead of silently defaulting to {} + log warning
… and SwappableLlmProvider
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async fn stream_request( | ||
| &self, | ||
| body: serde_json::Value, | ||
| on_chunk: &mut (dyn FnMut(String) + Send), | ||
| ) -> Result<OaiStreamResult, LlmError> { |
There was a problem hiding this comment.
stream_request/SSE parsing and tool_call accumulation is substantial new logic but has no unit coverage. Consider adding focused tests like codex_chatgpt.rs’s SSE parser tests (e.g., content deltas, tool_call argument concatenation, usage-in-last-chunk, and [DONE] termination), plus an error-mapping test for 5xx/400/413 paths.
| _ => LlmError::RequestFailed { | ||
| provider: "openai_compat".to_string(), | ||
| reason: format!("HTTP {}: {}", status, truncated), | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Error responses currently fall through to LlmError::RequestFailed for all non-401/403/429 statuses, including HTTP 5xx. Per .claude/rules/error-handling.md and the LlmError::BadGateway docs, upstream 5xx bodies must not be carried in a user-facing error; map 500..=599 to LlmError::BadGateway { status, retry_after } and only log a truncated body preview at debug! for operators.
| if !status.is_success() { | ||
| let code = status.as_u16(); | ||
| let retry_after = Some(crate::llm::retry::parse_retry_after( | ||
| response.headers().get("retry-after"), | ||
| )); |
There was a problem hiding this comment.
The streaming wrapper doesn't currently translate context-length / payload-too-large failures into LlmError::ContextLengthExceeded, so the dispatcher auto-compaction path won't trigger for streaming calls. Handle HTTP 400/413 by inspecting the error payload (similar patterns as rig_adapter::map_rig_error / nearai_chat) and returning ContextLengthExceeded { used, limit } where possible.
| let retry_after = Some(crate::llm::retry::parse_retry_after( | ||
| response.headers().get("retry-after"), | ||
| )); | ||
| let text = response.text().await.unwrap_or_default(); |
There was a problem hiding this comment.
response.text().await.unwrap_or_default() is a silent-failure pattern forbidden by .claude/rules/error-handling.md (it collapses IO errors into an empty body). Prefer unwrap_or_else with an explicit marker string, or map the body read failure into LlmError::RequestFailed so debugging isn't silently degraded.
| let text = response.text().await.unwrap_or_default(); | |
| let text = response | |
| .text() | |
| .await | |
| .unwrap_or_else(|e| format!("<failed to read error body: {}>", e)); |
| // Match RigAdapter behavior: rewrite orphaned tool_result messages as | ||
| // user messages so OpenAI-compatible endpoints do not reject the | ||
| // request with 400 "messages with role 'tool' must be a response to | ||
| // a preceeding message with 'tool_calls'". |
There was a problem hiding this comment.
Typo in the error message example: "preceeding" should be "preceding".
| // a preceeding message with 'tool_calls'". | |
| // a preceding message with 'tool_calls'". |
- unwrap_or_else on error body read to preserve failure context - map HTTP 5xx to LlmError::BadGateway (no body leak, operator debug log) - map HTTP 413 and context-length 400 to LlmError::ContextLengthExceeded - fix typo: preceeding -> preceding
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// When set, each text token/chunk from streaming LLM calls is sent to this | ||
| /// channel so callers can forward it to the client in real time. | ||
| /// | ||
| /// Bounded to apply backpressure; chunks that cannot be queued are dropped. |
There was a problem hiding this comment.
The doc comment says the channel is "bounded to apply backpressure" but the implementation uses try_send and explicitly drops chunks on overflow, which is lossy rather than backpressure. Consider rewording to reflect the actual behavior (bounded to cap memory; drops when full), or switch to send().await if true backpressure is desired.
| /// Bounded to apply backpressure; chunks that cannot be queued are dropped. | |
| /// Typically backed by a bounded channel to cap memory usage; chunks that | |
| /// cannot be queued may be dropped when the buffer is full. |
| // Inject current date/time so the model can answer time-related questions | ||
| // without guessing or intercepting them at the bot layer. | ||
| let now_str = match tz { | ||
| Some(t) => { | ||
| let dt = crate::timezone::now_in_tz(t); | ||
| format!("{} ({})", dt.format("%Y-%m-%d %H:%M"), t) | ||
| } | ||
| None => { | ||
| format!("{} (UTC)", Utc::now().format("%Y-%m-%d %H:%M")) | ||
| } | ||
| }; | ||
| parts.push(format!("## Current Time\n\n{}", now_str)); |
There was a problem hiding this comment.
This change injects current time into the system prompt, but the PR description focuses on LLM streaming/provider-chain changes and doesn't mention prompt-time injection. If this is intentional, please update the PR description (and/or split into a separate PR) so reviewers can assess the behavioral impact on prompting and caching independently.
| let response = builder.json(&body).send().await.map_err(|e| { | ||
| LlmError::RequestFailed { | ||
| provider: "openai_compat".to_string(), | ||
| reason: e.to_string(), | ||
| } |
There was a problem hiding this comment.
The streaming provider hardcodes provider: "openai_compat" in generated LlmErrors. Since this wrapper is used for multiple registry backends (OpenRouter, Groq, etc.), error attribution/logging becomes ambiguous. Consider passing provider_id (or a human-readable label) into OpenAiCompatStreamingProvider and using it consistently in LlmError construction and logs.
| ToolCall { | ||
| id: p.id, | ||
| name: p.name, | ||
| arguments, | ||
| reasoning: None, | ||
| } |
There was a problem hiding this comment.
On streamed tool calls, ToolCall.id is taken directly from the upstream tool_calls[*].id and may be empty or non-compliant with downstream constraints (some providers require a non-empty [A-Za-z0-9]{9} ID). Consider normalizing/generating a compliant ID when p.id is empty/invalid (similar to rig_adapter::normalized_tool_call_id / generate_tool_call_id) to avoid tool-execution failures.
| tracing::warn!( | ||
| tool = %p.name, | ||
| error = %e, | ||
| raw = %p.arguments, |
There was a problem hiding this comment.
Logging the full raw streamed tool-call arguments at warn level can leak user data and secrets into logs. Prefer omitting the raw payload (or logging only a truncated/sanitized preview) while still preserving the raw string in the returned ToolCall.arguments for downstream error reporting.
| raw = %p.arguments, | |
| raw_len = p.arguments.len(), |
| let mut parts = | ||
| vec![serde_json::json!({"type": "text", "text": msg.content})]; | ||
| for p in &msg.content_parts { | ||
| parts.push(serde_json::to_value(p).unwrap_or_default()); |
There was a problem hiding this comment.
serde_json::to_value(p).unwrap_or_default() silently drops serialization errors and inserts a default null part, which can produce malformed requests that are hard to diagnose. Prefer propagating/returning an error (or at least logging and skipping the part) so failures are explicit.
| parts.push(serde_json::to_value(p).unwrap_or_default()); | |
| match serde_json::to_value(p) { | |
| Ok(value) => parts.push(value), | |
| Err(err) => eprintln!( | |
| "failed to serialize chat message content part for role '{}': {}", | |
| role, err | |
| ), | |
| } |
- Fix doc comment: 'backpressure' -> accurate 'may be dropped' wording
- Add provider_id field to OpenAiCompatStreamingProvider for correct
error attribution across OpenRouter, Groq, etc. backends
- Replace raw = %p.arguments with raw_len to avoid leaking user data
in warn! logs
- Normalize streamed tool_call IDs via normalize_tool_call_id_for_streaming
(same [a-zA-Z0-9]{9} constraint as rig_adapter)
- Replace to_value(p).unwrap_or_default() with explicit warn + skip
…sponses" This reverts commit becfbc8.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let code = status.as_u16(); | ||
| let retry_after = Some(crate::llm::retry::parse_retry_after( | ||
| response.headers().get("retry-after"), | ||
| )); | ||
| let text = response |
There was a problem hiding this comment.
retry_after is always wrapped in Some(parse_retry_after(...)), which means missing Retry-After headers become Some(60s) and defeats the exponential backoff path for BadGateway retries (this is the same regression called out in retry.rs docs/tests). Preserve header absence by using response.headers().get("retry-after").map(parse_retry_after_value) for 5xx (and only default missing headers for 429 if desired).
| async fn complete_stream( | ||
| &self, | ||
| request: CompletionRequest, | ||
| on_chunk: &mut (dyn FnMut(String) + Send), | ||
| ) -> Result<CompletionResponse, LlmError> { | ||
| self.providers[self.last_used.load(Ordering::Relaxed)] | ||
| .complete_stream(request, on_chunk) | ||
| .await | ||
| } |
There was a problem hiding this comment.
complete_stream / complete_with_tools_stream delegate to last_used, but they don’t call bind_provider_to_current_task(...) like the non-streaming paths. Under concurrency this can cause effective_model_name() (and therefore cost attribution/metrics) to report the wrong provider if another request updates last_used before the caller reads it. Capture the index up front and bind it for the current task before/after the streaming call.
| on_chunk: &mut (dyn FnMut(String) + Send), | ||
| ) -> Result<CompletionResponse, LlmError> { | ||
| let resp = self.complete(request).await?; | ||
| on_chunk(resp.content.clone()); |
There was a problem hiding this comment.
The default complete_stream() fallback always invokes on_chunk even when resp.content is empty, while complete_with_tools_stream() explicitly suppresses empty content. Consider aligning behavior by skipping the callback when the content is empty to avoid emitting spurious empty stream chunks to clients.
| on_chunk(resp.content.clone()); | |
| if !resp.content.is_empty() { | |
| on_chunk(resp.content.clone()); | |
| } |
| { | ||
| let (chunk_tx, mut chunk_rx) = | ||
| tokio::sync::mpsc::channel::<String>(256); | ||
| let channels = Arc::clone(&self.agent.channels); |
There was a problem hiding this comment.
The streaming bridge uses a hard-coded channel capacity of 256. If this value is expected to be tuned (or kept consistent with other buffering limits), consider extracting it to a named constant or config to avoid a magic number here and make operational tuning easier.
Add streaming capability to the LLM provider system so clients receive tokens progressively rather than waiting for the full response.
New provider
OpenAiCompatStreamingProvider (src/llm/openai_compat_stream.rs):
Trait changes (src/llm/provider.rs)
Two new default methods on LlmProvider:
Default impls preserve backward compatibility for all existing providers.
Provider chain delegation
All wrapper providers now forward streaming calls instead of falling back to the non-streaming default:
Wiring (src/llm/reasoning.rs, src/agent/dispatcher.rs)
ReasoningContext gains an optional chunk_sender field. ChatDelegate::call_llm() spawns a forwarder task that reads from the channel and calls channels.send_status(StreamChunk), which the web gateway broadcasts as SSE stream_chunk events to connected clients.
Summary
Change Type
Linked Issue
Validation
cargo fmt --all -- --checkcargo clippy --all --benches --tests --examples --all-features -- -D warningscargo buildcargo test --features integrationif database-backed or integration behavior changedreview-prorpr-shepherd --fixwas run before requesting reviewSecurity Impact
Database Impact
Blast Radius
Rollback Plan
Review Follow-Through
Review track: