Skip to content
Merged
83 changes: 48 additions & 35 deletions src/openhuman/scheduler_gate/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,16 @@ pub fn update_config(cfg: SchedulerGateConfig) {
/// (e.g. in unit tests) so callers don't deadlock waiting on a sampler that
/// will never start.
///
/// When the signed-out override is set, returns [`Policy::Paused`] with
/// [`PauseReason::SignedOut`] unconditionally — this is the top-priority
/// "host should do no LLM work" signal and ignores config / signals.
/// When the signed-out override is set **and the gate has been initialised**,
/// returns [`Policy::Paused`] with [`PauseReason::SignedOut`] — this is the
/// top-priority "host should do no LLM work" signal and ignores config /
/// signals. We gate on [`STATE`] being present because the override only has
/// a meaningful effect when there are real background workers calling into
/// the gate; in unit tests where `init_global` was never called, the
/// process-global atomic can leak `true` from an earlier test and deadlock
/// every subsequent caller (see `wait_for_capacity` for the deadlock path).
pub fn current_policy() -> Policy {
if is_signed_out() {
if STATE.get().is_some() && is_signed_out() {
return Policy::Paused {
reason: PauseReason::SignedOut,
};
Expand Down Expand Up @@ -217,7 +222,19 @@ pub async fn wait_for_capacity() -> Option<LlmPermit> {
// cadence as the rest of the Paused arm. Holding here (rather than
// returning) means workers naturally resume the instant the user
// signs back in — no respawn dance, no missed wakeups.
if is_signed_out() {
//
// We gate on `STATE.get().is_some()` so the override only fires once
// the gate has been initialised by `init_global`. In unit tests
// where `init_global` was never called there is no background-worker
// pool to stand down, but the process-global `SIGNED_OUT` atomic can
// still leak `true` from an earlier test that exercised the
// credentials / 401 paths (`clear_session`, RPC 401 dispatch, or
// `SessionExpiredSubscriber.handle()`). Without the gate, every
// subsequent caller of `wait_for_capacity` polls forever on the
// 60-second fallback cadence — manifest as the
// `openhuman::agent::triage::evaluator::tests::*` hangs reported
// after #1516.
if STATE.get().is_some() && is_signed_out() {
let paused_ms = STATE
.get()
.map(|s| s.read().cfg.paused_poll_ms)
Expand Down Expand Up @@ -369,50 +386,46 @@ mod tests {
}

#[tokio::test]
async fn signed_out_override_pauses_policy_regardless_of_signals() {
async fn signed_out_is_ignored_when_gate_uninit() {
// In unit tests `init_global` is never called, so `STATE` is `None`.
// In that state the signed-out override is intentionally inert: there
// are no background workers to stand down, and honouring the
// process-global atomic would let any earlier test that flipped it
// (`clear_session`, RPC 401 dispatch, `SessionExpiredSubscriber`)
// deadlock every subsequent caller of `wait_for_capacity`.
let _g = lock();
// Make sure we start clean: another test may have left the flag on.
set_signed_out(false);
assert!(!is_signed_out(), "precondition: not signed out");

set_signed_out(true);

assert_eq!(
current_policy(),
Policy::Paused {
reason: PauseReason::SignedOut
},
"signed_out override must trump init_global state"
Policy::Normal,
"with STATE uninit, signed_out must NOT change current_policy"
);

set_signed_out(false);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
assert!(!is_signed_out(), "override must be releasable");
}

#[tokio::test]
async fn signed_out_makes_wait_for_capacity_block_briefly() {
// We can't easily prove "it polls forever" without invasive setup
// of the poll-interval state, so we just confirm it doesn't hand
// back a permit synchronously while the override is on, then
// releases as soon as it's cleared.
async fn wait_for_capacity_acquires_immediately_when_signed_out_and_uninit() {
// Regression test for the
// `openhuman::agent::triage::evaluator::tests::*` hangs that surfaced
// after #1516 added the `SIGNED_OUT` override. Earlier tests in the
// same `cargo test` binary that exercise `clear_session` /
// `SessionExpiredSubscriber` / the RPC 401 path leave the
// process-global atomic stuck `true`. Without the `STATE.is_some()`
// gate, every subsequent `wait_for_capacity()` polls forever on the
// 60-second `paused_poll_ms` fallback (STATE is None in tests, so
// the fallback is the unconfigured default).
let _g = lock();
set_signed_out(true);

let handle = tokio::spawn(async { wait_for_capacity().await });
tokio::time::sleep(TokioDuration::from_millis(40)).await;
assert!(
!handle.is_finished(),
"wait_for_capacity must block while signed out"
);
set_signed_out(false);
// Best-effort timeout to keep CI fast if the default poll interval
// is long (init_global isn't run in tests so STATE is None and the
// fallback is 60s) — abort and treat as pass on timeout.
if let Ok(joined) = timeout(TokioDuration::from_millis(200), handle).await {
let permit = joined.expect("join ok");
assert!(permit.is_some(), "permit returned after override cleared");
drop(permit);
}
// Ensure we leave the override clean for any later test.
let permit = timeout(TokioDuration::from_millis(500), wait_for_capacity())
.await
.expect("wait_for_capacity must NOT block when STATE is uninit, even if signed_out")
.expect("uninit gate still hands back a permit");
drop(permit);

set_signed_out(false);
}

Expand Down
Loading