Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/openhuman/agent/triage/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::openhuman::providers::reliable::{
is_rate_limited, is_upstream_unhealthy, parse_retry_after_ms,
};
use crate::openhuman::providers::ChatMessage;
use crate::openhuman::scheduler_gate::LlmPermit;
use std::future::Future;

use crate::openhuman::config::Config;

Expand Down Expand Up @@ -149,20 +151,53 @@ pub async fn run_triage(envelope: &TriggerEnvelope) -> anyhow::Result<TriageOutc
.context("resolving provider for triage turn")?;
let local = build_local_provider_with_config(&config);

let outcome = run_triage_with_arms(cloud, local, envelope).await;
let outcome = run_triage_with_arms_inner(cloud, local, envelope, || {
crate::openhuman::scheduler_gate::wait_for_capacity()
})
.await;
if let Err(err) = &outcome {
events::publish_failed(envelope, &format!("{err}"));
}
outcome
}

/// Inner driver for [`run_triage`] that takes already-resolved arms.
/// Tests inject stub providers via this entry point.
/// Tests inject stub providers via this entry point and acquire the
/// global LLM permit for the local arm via the production gate.
pub async fn run_triage_with_arms(
cloud: ResolvedProvider,
local: Option<ResolvedProvider>,
envelope: &TriggerEnvelope,
) -> anyhow::Result<TriageOutcome> {
run_triage_with_arms_inner(cloud, local, envelope, || {
crate::openhuman::scheduler_gate::wait_for_capacity()
})
.await
}

/// Test-only entry point: skip the global LLM permit acquisition so the
/// triage tests don't contend with `scheduler_gate`'s process-wide
/// 1-slot semaphore or get trapped by a stale `Paused` policy left in
/// `STATE` by another test's `init_global` call.
#[cfg(test)]
pub async fn run_triage_with_arms_for_test(
cloud: ResolvedProvider,
local: Option<ResolvedProvider>,
envelope: &TriggerEnvelope,
) -> anyhow::Result<TriageOutcome> {
run_triage_with_arms_inner(cloud, local, envelope, || async { None }).await
}

async fn run_triage_with_arms_inner<F, Fut>(
cloud: ResolvedProvider,
local: Option<ResolvedProvider>,
envelope: &TriggerEnvelope,
acquire_permit: F,
) -> anyhow::Result<TriageOutcome>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Option<LlmPermit>>,
{
// ── Cloud arm ──────────────────────────────────────────────────
match try_arm(&cloud, envelope, TriageResolutionPath::Cloud).await {
Ok(run) => return Ok(TriageOutcome::Decision(run)),
Expand Down Expand Up @@ -209,7 +244,7 @@ pub async fn run_triage_with_arms(

// Hold the global LLM permit for the lifetime of the local turn —
// protects laptop RAM from concurrent local model calls (#1073).
let _gate_permit = crate::openhuman::scheduler_gate::wait_for_capacity().await;
let _gate_permit = acquire_permit().await;

match try_arm(&local, envelope, TriageResolutionPath::LocalFallback).await {
Ok(run) => Ok(TriageOutcome::Decision(run)),
Expand Down
14 changes: 7 additions & 7 deletions src/openhuman/agent/triage/evaluator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn happy_path_returns_cloud_resolution() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect("happy path must succeed");

Expand Down Expand Up @@ -184,7 +184,7 @@ async fn rate_limited_then_ok_marks_cloud_after_retry() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect("retry path must succeed");

Expand Down Expand Up @@ -219,7 +219,7 @@ async fn double_429_falls_through_to_local_fallback() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect("local fallback must succeed");

Expand Down Expand Up @@ -252,7 +252,7 @@ async fn cloud_5xx_falls_through_to_local_fallback() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect("local fallback must succeed after 5xx");

Expand All @@ -276,7 +276,7 @@ async fn cloud_then_local_failure_returns_deferred() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect("Deferred is Ok, not Err");

Expand Down Expand Up @@ -314,7 +314,7 @@ async fn fatal_cloud_error_short_circuits_without_local_attempt() {
})
.await;

let err = run_triage_with_arms(cloud_arm(), Some(local_arm()), &envelope())
let err = run_triage_with_arms_for_test(cloud_arm(), Some(local_arm()), &envelope())
.await
.expect_err("auth failure must surface as Err");

Expand Down Expand Up @@ -345,7 +345,7 @@ async fn no_local_arm_returns_deferred_after_cloud_exhaustion() {
})
.await;

let outcome = run_triage_with_arms(cloud_arm(), None, &envelope())
let outcome = run_triage_with_arms_for_test(cloud_arm(), None, &envelope())
.await
.expect("Deferred is Ok");

Expand Down
Loading