Skip to content
25 changes: 25 additions & 0 deletions src/agent/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,31 @@ impl<'a> LoopDelegate for ChatDelegate<'a> {
}

let llm_call_start = std::time::Instant::now();

// Wire up real-time token streaming to the channel layer.
// Bounded channel bounds memory usage when the consumer (channel
// layer) is slower than the LLM; producer drops chunks on overflow
// via `try_send`.
{
let (chunk_tx, mut chunk_rx) =
tokio::sync::mpsc::channel::<String>(256);
let channels = Arc::clone(&self.agent.channels);
Comment on lines +633 to +636
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming bridge uses a hard-coded channel capacity of 256. If this value is expected to be tuned (or kept consistent with other buffering limits), consider extracting it to a named constant or config to avoid a magic number here and make operational tuning easier.

Copilot uses AI. Check for mistakes.
let channel_name = self.message.channel.clone();
let metadata = self.message.metadata.clone();
tokio::spawn(async move {
while let Some(chunk) = chunk_rx.recv().await {
let _ = channels
.send_status(
&channel_name,
crate::channels::StatusUpdate::StreamChunk(chunk),
&metadata,
)
.await;
}
});
reason_ctx.chunk_sender = Some(chunk_tx);
}
Comment on lines +629 to +651
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming forwarder uses an unbounded mpsc channel and awaits send_status() for every chunk. If the provider streams faster than the channel layer can deliver, the unbounded queue can grow without bound and increase memory usage. Prefer a bounded channel with try_send (dropping/coalescing when full) or another backpressure strategy suitable for token streaming.

Copilot uses AI. Check for mistakes.

let output = match reasoning.respond_with_tools(reason_ctx).await {
Ok(output) => output,
Err(crate::error::LlmError::ContextLengthExceeded { used, limit }) => {
Expand Down
36 changes: 36 additions & 0 deletions src/llm/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,42 @@ impl LlmProvider for CircuitBreakerProvider {
}
}

async fn complete_stream(
&self,
request: CompletionRequest,
on_chunk: &mut (dyn FnMut(String) + Send),
) -> Result<CompletionResponse, LlmError> {
self.check_allowed().await?;
match self.inner.complete_stream(request, on_chunk).await {
Ok(resp) => {
self.record_success().await;
Ok(resp)
}
Err(err) => {
self.record_failure(&err).await;
Err(err)
}
}
}

async fn complete_with_tools_stream(
&self,
request: ToolCompletionRequest,
on_chunk: &mut (dyn FnMut(String) + Send),
) -> Result<ToolCompletionResponse, LlmError> {
self.check_allowed().await?;
match self.inner.complete_with_tools_stream(request, on_chunk).await {
Ok(resp) => {
self.record_success().await;
Ok(resp)
}
Err(err) => {
self.record_failure(&err).await;
Err(err)
}
}
}

async fn list_models(&self) -> Result<Vec<String>, LlmError> {
self.inner.list_models().await
}
Expand Down
20 changes: 20 additions & 0 deletions src/llm/failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ impl LlmProvider for FailoverProvider {
Ok(response)
}

async fn complete_stream(
&self,
request: CompletionRequest,
on_chunk: &mut (dyn FnMut(String) + Send),
) -> Result<CompletionResponse, LlmError> {
self.providers[self.last_used.load(Ordering::Relaxed)]
.complete_stream(request, on_chunk)
.await
}
Comment on lines +332 to +340
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete_stream / complete_with_tools_stream delegate to last_used, but they don’t call bind_provider_to_current_task(...) like the non-streaming paths. Under concurrency this can cause effective_model_name() (and therefore cost attribution/metrics) to report the wrong provider if another request updates last_used before the caller reads it. Capture the index up front and bind it for the current task before/after the streaming call.

Copilot uses AI. Check for mistakes.

async fn complete_with_tools_stream(
&self,
request: ToolCompletionRequest,
on_chunk: &mut (dyn FnMut(String) + Send),
) -> Result<ToolCompletionResponse, LlmError> {
self.providers[self.last_used.load(Ordering::Relaxed)]
.complete_with_tools_stream(request, on_chunk)
.await
}

fn active_model_name(&self) -> String {
self.providers[self.last_used.load(Ordering::Relaxed)].active_model_name()
}
Expand Down
39 changes: 37 additions & 2 deletions src/llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod nearai_chat;
pub mod oauth_helpers;
pub mod openai_codex_provider;
pub mod openai_codex_session;
mod openai_compat_stream;
mod provider;
mod reasoning;
pub mod recording;
Expand Down Expand Up @@ -293,7 +294,7 @@ fn create_openai_compat_from_registry(
builder = builder.base_url(&base_url);
}
if !extra_headers.is_empty() {
builder = builder.http_headers(extra_headers);
builder = builder.http_headers(extra_headers.clone());
}

let client: openai::Client = builder.build().map_err(|e| LlmError::RequestFailed {
Expand All @@ -316,7 +317,41 @@ fn create_openai_compat_from_registry(

let adapter = RigAdapter::new(model, &config.model)
.with_unsupported_params(config.unsupported_params.clone());
Ok(Arc::new(adapter))
// Re-use the already-validated header map: iterate it to build the
// (String, String) pairs for the streaming provider, skipping any that
// produced warnings above.
let extra_headers_vec: Vec<(String, String)> = extra_headers
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|v| (name.as_str().to_string(), v.to_string()))
})
.collect();
let unsupported: std::collections::HashSet<String> =
config.unsupported_params.iter().cloned().collect();
// Normalize the base_url the same way the rig-core client does so the
// streaming path hits the exact same endpoint.
let streaming_base_url = if config.base_url.is_empty() {
String::new()
} else {
normalize_openai_base_url(&config.base_url)
};
let streaming = openai_compat_stream::OpenAiCompatStreamingProvider::new(
Arc::new(adapter),
api_key,
streaming_base_url,
config.model.clone(),
config.provider_id.clone(),
extra_headers_vec,
unsupported,
)
.map_err(|e| LlmError::RequestFailed {
provider: config.provider_id.clone(),
reason: format!("Failed to build streaming HTTP client: {e}"),
})?;
Ok(Arc::new(streaming))
}

fn create_anthropic_from_registry(
Expand Down
Loading
Loading