From 315eb70e9b720039e55c264e14cf108981a23183 Mon Sep 17 00:00:00 2001 From: lee-to Date: Wed, 13 May 2026 20:54:07 +0300 Subject: [PATCH 1/2] fix: pin runtime selection per task stage --- .../agent/src/__tests__/subagentQuery.test.ts | 72 ++++++++++ packages/agent/src/coordinator.ts | 9 ++ packages/agent/src/subagentQuery.ts | 136 ++++++++++++++---- packages/api/src/services/taskEvents.ts | 6 + packages/data/src/__tests__/index.test.ts | 52 +++++++ packages/data/src/index.ts | 127 ++++++++++++++++ packages/shared/src/__tests__/db.test.ts | 24 +++- packages/shared/src/db.ts | 10 ++ packages/shared/src/index.ts | 1 + packages/shared/src/schema.ts | 2 + packages/shared/src/types.ts | 16 +++ 11 files changed, 420 insertions(+), 35 deletions(-) diff --git a/packages/agent/src/__tests__/subagentQuery.test.ts b/packages/agent/src/__tests__/subagentQuery.test.ts index a8d6294f..7196e1ea 100644 --- a/packages/agent/src/__tests__/subagentQuery.test.ts +++ b/packages/agent/src/__tests__/subagentQuery.test.ts @@ -10,6 +10,8 @@ const clearRuntimeProfileLimitSnapshotMock = vi.fn(); const notifyProjectRuntimeLimitBroadcastMock = vi.fn(); const saveTaskSessionIdMock = vi.fn(); const getTaskSessionIdMock = vi.fn<(taskId: string) => string | null>(() => null); +const saveTaskActiveRuntimeSelectionMock = vi.fn(); +const getTaskActiveRuntimeSelectionMock = vi.fn<() => Record | null>(() => null); const codexStartThreadMock = vi.fn(); const codexResumeThreadMock = vi.fn(); const expireStaleRuntimeWarmupSessionsMock = vi.fn(() => 0); @@ -41,6 +43,7 @@ const getAppDefaultRuntimeProfileIdMock = vi.fn< interface MockTaskRow { id: string; projectId: string; + status?: string; runtimeOptionsJson: string | null; modelOverride: string | null; branchName?: string | null; @@ -103,6 +106,8 @@ vi.mock("@aif/data", async (importOriginal) => { updateTaskHeartbeat: vi.fn(), renewTaskClaim: vi.fn(), persistRuntimeProfileLimitSnapshot: persistRuntimeProfileLimitSnapshotMock, + saveTaskActiveRuntimeSelection: saveTaskActiveRuntimeSelectionMock, + getTaskActiveRuntimeSelection: getTaskActiveRuntimeSelectionMock, saveTaskSessionId: saveTaskSessionIdMock, getTaskSessionId: getTaskSessionIdMock, expireStaleRuntimeWarmupSessions: expireStaleRuntimeWarmupSessionsMock, @@ -194,6 +199,9 @@ const { RuntimeExecutionError, createRuntimeWorkflowSpec } = await import("@aif/ const { executeSubagentQuery, resolveAdapterForTask } = await import("../subagentQuery.js"); beforeEach(() => { + saveTaskActiveRuntimeSelectionMock.mockReset(); + getTaskActiveRuntimeSelectionMock.mockReset(); + getTaskActiveRuntimeSelectionMock.mockReturnValue(null); expireStaleRuntimeWarmupSessionsMock.mockReset(); expireStaleRuntimeWarmupSessionsMock.mockReturnValue(0); findActiveReadyRuntimeWarmupSessionMock.mockReset(); @@ -1443,6 +1451,7 @@ describe("executeSubagentQuery model fallback policy", () => { findTaskByIdMock.mockReturnValue({ id: "task-1", projectId: "project-1", + status: "implementing", runtimeOptionsJson: null, modelOverride: null, }); @@ -1458,6 +1467,69 @@ describe("executeSubagentQuery model fallback policy", () => { const callOptions = queryMock.mock.calls[0][0].options as Record; expect(callOptions.model).toBe("profile-model"); + expect(saveTaskActiveRuntimeSelectionMock).toHaveBeenCalledWith( + "task-1", + expect.objectContaining({ + status: "implementing", + profileMode: "task", + runtimeId: "claude", + providerId: "anthropic", + profileId: "profile-1", + model: "profile-model", + }), + ); + }); + + it("uses pinned runtime selection for retries in the same status and profile mode", async () => { + findTaskByIdMock.mockReturnValue({ + id: "task-1", + projectId: "project-1", + status: "implementing", + runtimeOptionsJson: JSON.stringify({ effort: "new-effort" }), + modelOverride: "new-task-model", + }); + getTaskActiveRuntimeSelectionMock.mockReturnValue({ + status: "implementing", + profileMode: "task", + source: "project_default", + profileId: "profile-old", + runtimeId: "claude", + providerId: "anthropic", + transport: "sdk", + model: "pinned-model", + baseUrl: null, + apiKeyEnvVar: "ANTHROPIC_API_KEY", + headers: {}, + options: { effort: "medium" }, + pinnedAt: "2026-05-13T00:00:00.000Z", + }); + resolveEffectiveRuntimeProfileMock.mockReturnValue({ + source: "project_default", + profile: { + id: "profile-new", + runtimeId: "claude", + providerId: "anthropic", + defaultModel: "new-profile-model", + }, + taskRuntimeProfileId: null, + projectRuntimeProfileId: "profile-new", + systemRuntimeProfileId: null, + }); + queryMock.mockImplementation(makeDelayedSuccess(0, "ok")); + + await executeSubagentQuery({ + taskId: "task-1", + projectRoot: "/tmp/project", + agentName: "review-gate", + prompt: "check", + workflowKind: "review-gate", + }); + + const callOptions = queryMock.mock.calls[0][0].options as Record; + expect(callOptions.model).toBe("pinned-model"); + expect(callOptions.effort).toBe("medium"); + expect(resolveEffectiveRuntimeProfileMock).not.toHaveBeenCalled(); + expect(saveTaskActiveRuntimeSelectionMock).not.toHaveBeenCalled(); }); it("does not inject lightModel when no task override and no profile model", async () => { diff --git a/packages/agent/src/coordinator.ts b/packages/agent/src/coordinator.ts index 540b325d..901135b7 100644 --- a/packages/agent/src/coordinator.ts +++ b/packages/agent/src/coordinator.ts @@ -1,4 +1,5 @@ import { + clearTaskActiveRuntimeSelection, clearTaskRuntimeLimitSnapshot, blockTaskForRuntimeGateIfEligible, evaluateRuntimeLimitGate, @@ -363,6 +364,9 @@ async function processOneTask(task: TaskRow, stage: StatusTransition): Promise): string | null { return null; } +function normalizeOptionalString(value: string | null | undefined): string | null { + if (typeof value !== "string") return null; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function hydratePinnedRuntimeProfile( + selection: ReturnType, + workflow: RuntimeWorkflowSpec, +): ResolvedRuntimeProfile | null { + if (!selection) return null; + const apiKeyEnvVar = normalizeOptionalString(selection.apiKeyEnvVar); + const apiKey = apiKeyEnvVar ? normalizeOptionalString(process.env[apiKeyEnvVar]) : null; + + return { + source: selection.source, + profileId: selection.profileId, + runtimeId: selection.runtimeId, + providerId: selection.providerId, + transport: selection.transport, + baseUrl: selection.baseUrl, + apiKeyEnvVar, + apiKey, + model: selection.model, + headers: selection.headers, + options: selection.options, + workflow, + }; +} + function createRuntimeRegistryLogger(): RuntimeRegistryLogger { return { debug(context, message) { @@ -510,40 +543,83 @@ async function resolveExecutionContext(options: SubagentQueryOptions): Promise<{ const task = findTaskById(options.taskId); const profileMode = options.profileMode ?? "task"; const systemDefaultRuntimeProfileId = getAppDefaultRuntimeProfileId(profileMode); - const effective = resolveEffectiveRuntimeProfile({ - taskId: options.taskId, - projectId: task?.projectId, - mode: profileMode, - systemDefaultRuntimeProfileId, - }); const workflow = buildWorkflowSpec(options); - const runtimeOptionsOverride = parseRuntimeOptions(task?.runtimeOptionsJson); - const suppressModelFallback = options.suppressModelFallback === true; - const modelOverride = - options.modelOverride ?? (suppressModelFallback ? null : (task?.modelOverride ?? null)); + const pinnedSelection = task ? getTaskActiveRuntimeSelection(options.taskId) : null; + const canUsePinnedSelection = + pinnedSelection != null && + task?.status != null && + pinnedSelection.status === task.status && + pinnedSelection.profileMode === profileMode; + let resolved = canUsePinnedSelection + ? hydratePinnedRuntimeProfile(pinnedSelection, workflow) + : null; - const resolved = resolveRuntimeProfile({ - source: effective.source, - profile: effective.profile, - workflow, - modelOverride, - suppressModelFallback, - runtimeOptionsOverride, - fallbackRuntimeId: getEnv().AIF_DEFAULT_RUNTIME_ID, - fallbackProviderId: getEnv().AIF_DEFAULT_PROVIDER_ID, - env: process.env, - logger: { - debug(context, message) { - log.debug({ ...context }, `[runtime-resolution] ${message}`); - }, - info(context, message) { - log.info({ ...context }, `INFO [runtime-validation] ${message}`); + if (!resolved) { + const effective = resolveEffectiveRuntimeProfile({ + taskId: options.taskId, + projectId: task?.projectId, + mode: profileMode, + systemDefaultRuntimeProfileId, + }); + const runtimeOptionsOverride = parseRuntimeOptions(task?.runtimeOptionsJson); + const suppressModelFallback = options.suppressModelFallback === true; + const modelOverride = + options.modelOverride ?? (suppressModelFallback ? null : (task?.modelOverride ?? null)); + + resolved = resolveRuntimeProfile({ + source: effective.source, + profile: effective.profile, + workflow, + modelOverride, + suppressModelFallback, + runtimeOptionsOverride, + fallbackRuntimeId: getEnv().AIF_DEFAULT_RUNTIME_ID, + fallbackProviderId: getEnv().AIF_DEFAULT_PROVIDER_ID, + env: process.env, + logger: { + debug(context, message) { + log.debug({ ...context }, `[runtime-resolution] ${message}`); + }, + info(context, message) { + log.info({ ...context }, `INFO [runtime-validation] ${message}`); + }, + warn(context, message) { + log.warn({ ...context }, `WARN [runtime-validation] ${message}`); + }, }, - warn(context, message) { - log.warn({ ...context }, `WARN [runtime-validation] ${message}`); + }); + + if (task?.status) { + saveTaskActiveRuntimeSelection(options.taskId, { + status: task.status, + profileMode, + source: resolved.source, + profileId: resolved.profileId, + runtimeId: resolved.runtimeId, + providerId: resolved.providerId, + transport: resolved.transport, + model: resolved.model, + baseUrl: resolved.baseUrl, + apiKeyEnvVar: resolved.apiKeyEnvVar, + headers: resolved.headers, + options: resolved.options, + pinnedAt: new Date().toISOString(), + }); + } + } else { + log.info( + { + taskId: options.taskId, + profileMode, + status: task?.status ?? null, + runtimeId: resolved.runtimeId, + providerId: resolved.providerId, + profileId: resolved.profileId, }, - }, - }); + "Using pinned task runtime selection for subagent query", + ); + } + const suppressModelFallback = options.suppressModelFallback === true; // Resolve adapter after profile — lightModel is NOT injected into the // general resolution chain. Callers that need lightModel (reviewGate) diff --git a/packages/api/src/services/taskEvents.ts b/packages/api/src/services/taskEvents.ts index 838c0fd9..e4535de2 100644 --- a/packages/api/src/services/taskEvents.ts +++ b/packages/api/src/services/taskEvents.ts @@ -11,6 +11,7 @@ import { type TaskEvent, } from "@aif/shared"; import { + clearTaskActiveRuntimeSelection, findProjectById, findTaskById, getLatestHumanComment, @@ -219,6 +220,9 @@ function handleRegularTransition(input: EventHandlerInput): EventHandlerResult { } const nowIso = new Date().toISOString(); + if (event !== "retry_from_blocked") { + clearTaskActiveRuntimeSelection(task.id); + } setTaskFields(task.id, { ...transition.patch, lastHeartbeatAt: nowIso, updatedAt: nowIso }); const updated = findTaskById(task.id); @@ -304,6 +308,8 @@ function handleAcceptExistingPlan(input: EventHandlerInput): EventHandlerResult setTaskFields(input.taskId, { status: "plan_ready", + activeRuntimeStatus: null, + activeRuntimeSelectionJson: null, blockedReason: null, blockedFromStatus: null, retryAfter: null, diff --git a/packages/data/src/__tests__/index.test.ts b/packages/data/src/__tests__/index.test.ts index 8bb2b96d..be99bfb4 100644 --- a/packages/data/src/__tests__/index.test.ts +++ b/packages/data/src/__tests__/index.test.ts @@ -42,6 +42,9 @@ const { appendTaskActivityLog, updateTaskHeartbeat, updateTaskStatus, + saveTaskActiveRuntimeSelection, + getTaskActiveRuntimeSelection, + clearTaskActiveRuntimeSelection, incrementTaskTokenUsage, findTasksByRoadmapAlias, persistTaskPlanForTask, @@ -214,6 +217,55 @@ describe("data layer", () => { }); }); + describe("active runtime selection", () => { + it("persists and clears a stage-scoped runtime selection", () => { + const task = createTask({ projectId: "proj-1", title: "T", description: "D" }); + expect(task).toBeDefined(); + + updateTaskStatus(task!.id, "implementing"); + saveTaskActiveRuntimeSelection(task!.id, { + status: "implementing", + profileMode: "task", + source: "project_default", + profileId: "profile-1", + runtimeId: "claude", + providerId: "anthropic", + transport: "sdk", + model: "claude-sonnet", + baseUrl: null, + apiKeyEnvVar: "ANTHROPIC_API_KEY", + headers: {}, + options: { effort: "medium" }, + pinnedAt: "2026-05-13T00:00:00.000Z", + }); + + expect(getTaskActiveRuntimeSelection(task!.id)).toEqual( + expect.objectContaining({ + status: "implementing", + profileMode: "task", + runtimeId: "claude", + model: "claude-sonnet", + options: { effort: "medium" }, + }), + ); + + clearTaskActiveRuntimeSelection(task!.id); + expect(getTaskActiveRuntimeSelection(task!.id)).toBeNull(); + }); + + it("ignores malformed active runtime selection payloads", () => { + const task = createTask({ projectId: "proj-1", title: "T", description: "D" }); + expect(task).toBeDefined(); + + setTaskFields(task!.id, { + activeRuntimeStatus: "implementing", + activeRuntimeSelectionJson: JSON.stringify({ status: "implementing" }), + }); + + expect(getTaskActiveRuntimeSelection(task!.id)).toBeNull(); + }); + }); + describe("deleteTask", () => { it("deletes task and its comments", () => { const t = createTask({ projectId: "proj-1", title: "T", description: "D" }); diff --git a/packages/data/src/index.ts b/packages/data/src/index.ts index 3fecc562..0eb9765d 100644 --- a/packages/data/src/index.ts +++ b/packages/data/src/index.ts @@ -25,6 +25,7 @@ import { generatePlanPath, getEnv, getProjectConfig, + isRuntimeTransport, logger as createLogger, normalizeRuntimeLimitSnapshot, redactProviderText, @@ -52,6 +53,7 @@ import { type RuntimeLimitSnapshot, type RuntimeLimitWindow, type RuntimeLimitFutureHint, + type TaskActiveRuntimeSelection, type UpdateAppSettingsInput, type UpdateRuntimeProfileInput, type RuntimeWarmupSessionStatus, @@ -180,6 +182,8 @@ export function toTaskResponse(task: TaskRow): Task { tags, runtimeOptionsJson, autoReviewStateJson, + activeRuntimeSelectionJson: _activeRuntimeSelectionJson, + activeRuntimeStatus: _activeRuntimeStatus, runtimeLimitSnapshotJson, ...rest } = task; @@ -216,6 +220,89 @@ function parseRuntimeObject(raw: string | null | undefined): Record | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + return value as Record; +} + +function parseStringRecord(value: unknown): Record | null { + const record = asRecord(value); + if (!record) return null; + + const result: Record = {}; + for (const [key, entryValue] of Object.entries(record)) { + if (typeof entryValue !== "string") { + return null; + } + result[key] = entryValue; + } + return result; +} + +function readOptionalString(value: Record, key: string): string | null | undefined { + const raw = value[key]; + if (raw === null) return null; + if (raw === undefined) return undefined; + return typeof raw === "string" ? raw : undefined; +} + +function parseTaskActiveRuntimeSelection( + raw: string | null | undefined, +): TaskActiveRuntimeSelection | null { + const parsed = parseRuntimeObject(raw); + if (!parsed) return null; + + const status = parsed.status; + const profileMode = parsed.profileMode; + const source = parsed.source; + const profileId = readOptionalString(parsed, "profileId"); + const runtimeId = parsed.runtimeId; + const providerId = parsed.providerId; + const transport = parsed.transport; + const model = readOptionalString(parsed, "model"); + const baseUrl = readOptionalString(parsed, "baseUrl"); + const apiKeyEnvVar = readOptionalString(parsed, "apiKeyEnvVar"); + const headers = parseStringRecord(parsed.headers); + const options = asRecord(parsed.options); + const pinnedAt = parsed.pinnedAt; + + if ( + typeof status !== "string" || + (profileMode !== "task" && profileMode !== "plan" && profileMode !== "review") || + typeof source !== "string" || + profileId === undefined || + typeof runtimeId !== "string" || + typeof providerId !== "string" || + !isRuntimeTransport(transport) || + model === undefined || + baseUrl === undefined || + apiKeyEnvVar === undefined || + !headers || + !options || + typeof pinnedAt !== "string" + ) { + return null; + } + + return { + status: status as TaskStatus, + profileMode, + source, + profileId, + runtimeId, + providerId, + transport, + model, + baseUrl, + apiKeyEnvVar, + headers, + options, + pinnedAt, + }; +} + interface RuntimeProfileUsageState { lastUsage: RuntimeProfileUsage; lastUsageAt: string; @@ -1639,6 +1726,46 @@ export function getTaskSessionId(taskId: string): string | null { return task?.sessionId ?? null; } +export function saveTaskActiveRuntimeSelection( + taskId: string, + selection: TaskActiveRuntimeSelection, +): void { + setTaskFields(taskId, { + activeRuntimeStatus: selection.status, + activeRuntimeSelectionJson: JSON.stringify(selection), + }); +} + +export function getTaskActiveRuntimeSelection( + taskId: string, +): TaskActiveRuntimeSelection | null { + const task = findTaskById(taskId); + if (!task?.activeRuntimeSelectionJson) return null; + + const selection = parseTaskActiveRuntimeSelection(task.activeRuntimeSelectionJson); + if (!selection) { + log.warn({ taskId }, "Ignoring malformed task active runtime selection"); + return null; + } + + if (task.activeRuntimeStatus && task.activeRuntimeStatus !== selection.status) { + log.warn( + { taskId, activeRuntimeStatus: task.activeRuntimeStatus, selectionStatus: selection.status }, + "Ignoring mismatched task active runtime selection", + ); + return null; + } + + return selection; +} + +export function clearTaskActiveRuntimeSelection(taskId: string): void { + setTaskFields(taskId, { + activeRuntimeStatus: null, + activeRuntimeSelectionJson: null, + }); +} + export function incrementTaskTokenUsage( taskId: string, usage: Record | null | undefined, diff --git a/packages/shared/src/__tests__/db.test.ts b/packages/shared/src/__tests__/db.test.ts index 6e0b1859..11d2c939 100644 --- a/packages/shared/src/__tests__/db.test.ts +++ b/packages/shared/src/__tests__/db.test.ts @@ -7,6 +7,8 @@ import { eq } from "drizzle-orm"; import { chatSessions } from "../schema.js"; import { closeDb, createTestDb, getDb } from "../db.js"; +const CURRENT_SCHEMA_VERSION = 22; + function removeSqliteArtifacts(dbPath: string): void { for (const path of [dbPath, `${dbPath}-wal`, `${dbPath}-shm`]) { try { @@ -396,12 +398,14 @@ describe("db", () => { "runtime_limit_updated_at", "branch_name", "worktree_path", + "active_runtime_status", + "active_runtime_selection_json", ]), ); expect(runtimeProfileColumns.map((column) => column.name)).toEqual( expect.arrayContaining(["runtime_limit_snapshot_json", "runtime_limit_updated_at"]), ); - expect(userVersion).toBe(21); + expect(userVersion).toBe(CURRENT_SCHEMA_VERSION); } finally { closeDb(); removeSqliteArtifacts(dbPath); @@ -451,11 +455,17 @@ describe("db", () => { `, ) .get() as { name: string } | undefined; + const taskColumns = migratedSqlite.prepare(`PRAGMA table_info(tasks)`).all() as Array<{ + name: string; + }>; const userVersion = migratedSqlite.pragma("user_version", { simple: true }) as number; migratedSqlite.close(); expect(dirtyIndex).toBeUndefined(); - expect(userVersion).toBe(21); + expect(taskColumns.map((column) => column.name)).toEqual( + expect.arrayContaining(["active_runtime_status", "active_runtime_selection_json"]), + ); + expect(userVersion).toBe(CURRENT_SCHEMA_VERSION); } finally { closeDb(); removeSqliteArtifacts(dbPath); @@ -520,12 +530,14 @@ describe("db", () => { "runtime_limit_updated_at", "branch_name", "worktree_path", + "active_runtime_status", + "active_runtime_selection_json", ]), ); expect(profileColumns.map((column) => column.name)).toEqual( expect.arrayContaining(["runtime_limit_snapshot_json", "runtime_limit_updated_at"]), ); - expect(userVersion).toBe(21); + expect(userVersion).toBe(CURRENT_SCHEMA_VERSION); } finally { closeDb(); removeSqliteArtifacts(dbPath); @@ -591,10 +603,12 @@ describe("db", () => { "auto_review_state_json", "runtime_limit_snapshot_json", "runtime_limit_updated_at", + "active_runtime_status", + "active_runtime_selection_json", ]), ); expect(warmupTable?.name).toBe("runtime_warmup_sessions"); - expect(userVersion).toBe(21); + expect(userVersion).toBe(CURRENT_SCHEMA_VERSION); } finally { closeDb(); removeSqliteArtifacts(dbPath); @@ -641,7 +655,7 @@ describe("db", () => { "idx_runtime_warmup_active_lookup", "idx_runtime_warmup_expires", ]); - expect(userVersion).toBe(21); + expect(userVersion).toBe(CURRENT_SCHEMA_VERSION); } finally { closeDb(); removeSqliteArtifacts(dbPath); diff --git a/packages/shared/src/db.ts b/packages/shared/src/db.ts index 0accc195..39f876c2 100644 --- a/packages/shared/src/db.ts +++ b/packages/shared/src/db.ts @@ -113,6 +113,8 @@ function ensureTables(sqlite: Database.Database): void { model_override TEXT, runtime_options_json TEXT, session_id TEXT, + active_runtime_status TEXT, + active_runtime_selection_json TEXT, runtime_limit_snapshot_json TEXT, runtime_limit_updated_at TEXT, locked_by TEXT, @@ -696,6 +698,14 @@ const MIGRATIONS: Migration[] = [ ); `, }, + { + version: 22, + description: "Add stage-scoped active runtime selection to tasks", + sql: ` + ALTER TABLE tasks ADD COLUMN active_runtime_status TEXT; + ALTER TABLE tasks ADD COLUMN active_runtime_selection_json TEXT; + `, + }, ]; function splitSqlStatements(sqlText: string): string[] { diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index baac30dd..f95e573b 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -62,6 +62,7 @@ export { type AppSettings, type UpdateAppSettingsInput, type Task, + type TaskActiveRuntimeSelection, type CreateTaskInput, type UpdateTaskInput, type TaskComment, diff --git a/packages/shared/src/schema.ts b/packages/shared/src/schema.ts index d9ad82e9..3b06e40f 100644 --- a/packages/shared/src/schema.ts +++ b/packages/shared/src/schema.ts @@ -97,6 +97,8 @@ export const tasks = sqliteTable("tasks", { modelOverride: text("model_override"), runtimeOptionsJson: text("runtime_options_json"), sessionId: text("session_id"), + activeRuntimeStatus: text("active_runtime_status").$type(), + activeRuntimeSelectionJson: text("active_runtime_selection_json"), runtimeLimitSnapshotJson: text("runtime_limit_snapshot_json"), runtimeLimitUpdatedAt: text("runtime_limit_updated_at"), lockedBy: text("locked_by"), diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 3d6c711b..eb1994f3 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -152,6 +152,22 @@ export interface Task { updatedAt: string; } +export interface TaskActiveRuntimeSelection { + status: TaskStatus; + profileMode: "task" | "plan" | "review"; + source: string; + profileId: string | null; + runtimeId: string; + providerId: string; + transport: RuntimeTransport; + model: string | null; + baseUrl: string | null; + apiKeyEnvVar: string | null; + headers: Record; + options: Record; + pinnedAt: string; +} + export interface TaskComment { id: string; taskId: string; From 49037f131531a76ce72dda75e58a38757e76e33e Mon Sep 17 00:00:00 2001 From: lee-to Date: Thu, 14 May 2026 11:26:36 +0300 Subject: [PATCH 2/2] fix: gate stage runtime pinning --- .env.example | 4 ++ docs/architecture.md | 2 +- docs/configuration.md | 1 + .../agent/src/__tests__/coordinator.test.ts | 9 ++++ packages/agent/src/__tests__/hooks.test.ts | 1 + .../agent/src/__tests__/subagentQuery.test.ts | 50 ++++++++++++++++++- packages/agent/src/subagentQuery.ts | 11 ++-- packages/api/src/__tests__/tasks.test.ts | 30 +++++++++++ packages/shared/src/__tests__/env.test.ts | 13 +++++ .../shared/src/__tests__/turboEnv.test.ts | 1 + packages/shared/src/env.ts | 10 ++++ turbo.json | 1 + 12 files changed, 127 insertions(+), 6 deletions(-) diff --git a/.env.example b/.env.example index a78d81cb..fa2e26f8 100644 --- a/.env.example +++ b/.env.example @@ -98,6 +98,10 @@ API_RUNTIME_RUN_TIMEOUT_MS=120000 # Leave disabled until warmup callers are enabled and monitored. # AIF_RUNTIME_SESSION_FORK_ENABLED=false +# Opt-in rollout for stage-scoped runtime selection pins. +# When disabled, same-status retries resolve runtime profiles normally. +# AIF_STAGE_RUNTIME_PIN_ENABLED=false + # Opt-in OpenCode API transport workaround for long model generations. # When enabled, /session/:id/message uses an undici dispatcher with disabled # header/body idle timeouts while AGENT_STAGE_RUN_TIMEOUT_MS remains authoritative. diff --git a/docs/architecture.md b/docs/architecture.md index fffb709b..8a82760a 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -300,7 +300,7 @@ SQLite via `better-sqlite3` with `drizzle-orm` for type-safe queries. Schema is Key tables: -- **tasks** — task data, status, plan/logs, heartbeat metadata, runtime override fields (`runtime_profile_id`, `model_override`, `runtime_options_json`), runtime session id (`session_id`), auto-review convergence state (`manual_review_required`, `auto_review_state_json`), and task-level runtime-limit copy (`runtime_limit_snapshot_json`, `runtime_limit_updated_at`) +- **tasks** — task data, status, plan/logs, heartbeat metadata, runtime override fields (`runtime_profile_id`, `model_override`, `runtime_options_json`), runtime session id (`session_id`), internal stage-scoped runtime retry pin (`active_runtime_status`, `active_runtime_selection_json`), auto-review convergence state (`manual_review_required`, `auto_review_state_json`), and task-level runtime-limit copy (`runtime_limit_snapshot_json`, `runtime_limit_updated_at`). The active runtime fields are distinct from `session_id` and `runtime_limit_snapshot_json`; they store the runtime/profile/model/options selected for same-status retries and are cleared on stage or human transitions except `retry_from_blocked`. - **runtime_profiles** — project-scoped or global runtime/provider profiles with non-secret transport/model config plus authoritative runtime-limit state (`runtime_limit_snapshot_json`, `runtime_limit_updated_at`) - **projects** — project metadata plus default runtime profile ids for tasks and chat - **chat_sessions / chat_messages** — persisted chat state with runtime profile/session linkage diff --git a/docs/configuration.md b/docs/configuration.md index 1ced8933..756c2fb9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -52,6 +52,7 @@ Node packages (`@aif/api`, `@aif/agent`, `@aif/data`, `@aif/shared`) auto-load e | `COORDINATOR_MAX_CONCURRENT_TASKS` | number | `3` | Max concurrent tasks per stage for parallel-enabled projects. Non-parallel projects always process 1 task at a time regardless of this value. Range 1–10 | | `AIF_TASK_WORKTREES_ENABLED` | boolean | `false` | Off-by-default rollout flag for per-task git worktrees. When `false`, branch-isolated projects (`git.create_branches=true`) stay serial and the API rejects parallel auto-queue for those projects. When `true`, full-mode planning for parallel branch-isolated projects provisions an isolated sibling git worktree and stores it on `tasks.worktree_path` | | `AIF_RUNTIME_SESSION_FORK_ENABLED` | boolean | `false` | Off-by-default rollout flag for runtime session forks. When `false`, adapters keep `supportsSessionFork=false` in descriptor/effective capabilities even if the transport implementation exists. When `true`, fork-capable transports expose `supportsSessionFork=true` so warmup callers can opt into `forkSession()` | +| `AIF_STAGE_RUNTIME_PIN_ENABLED` | boolean | `false` | Off-by-default rollout flag for stage-scoped task runtime-selection pins. When `false`, every subagent execution resolves the effective runtime profile normally and skips task pin lookup/persistence. When `true`, the agent stores the runtime/profile/model/options chosen at the start of a task status and reuses them for same-status retries until the task advances or a human transition clears the pin | | `AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED` | boolean | `false` | Off-by-default rollout flag for Codex SDK native subagents. When `false`, Codex `native_subagents` workflows fall back to isolated `$aif-*` skill-session execution even if the profile requests `codexSubagentStrategy: "native"`. When `true`, Codex may use native subagents only for SDK profiles with required `.codex/agents/*.toml` and `.codex/config.toml` assets present; missing assets still fall back to isolated mode | | `AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED` | boolean | `false` | Off-by-default rollout flag for long-running OpenCode API session messages. When `false`, OpenCode `/session/:id/message` uses the default fetch behavior. When `true`, that message POST uses an undici dispatcher with disabled header/body idle timeouts while the adapter AbortController timeout remains authoritative | | `AGENT_BYPASS_PERMISSIONS` | boolean | `true` | Provider-neutral bypass flag. When `true`, subagents run without approval prompts and without any OS-level sandbox. Each adapter translates per its native mechanism — Claude: `--dangerously-skip-permissions`; Codex: `approval_policy=never` + `sandbox_mode=danger-full-access`. When `false`, each adapter falls back to its safer default (Claude: `.claude/settings.json` allow rules; Codex: `approval_policy=on-request` + `sandbox_mode=workspace-write`). See `docs/providers.md` § Bypass semantics | diff --git a/packages/agent/src/__tests__/coordinator.test.ts b/packages/agent/src/__tests__/coordinator.test.ts index 9d334854..9520caa3 100644 --- a/packages/agent/src/__tests__/coordinator.test.ts +++ b/packages/agent/src/__tests__/coordinator.test.ts @@ -256,8 +256,15 @@ describe("coordinator", () => { projectId: "test-project", title: "Resume impl", status: "implementing", + activeRuntimeStatus: "implementing", + activeRuntimeSelectionJson: JSON.stringify({ status: "implementing" }), }) .run(); + vi.mocked(runImplementer).mockImplementationOnce(async () => { + const task = db.select().from(tasks).where(eq(tasks.id, "task-impl")).get(); + expect(task!.activeRuntimeStatus).toBe("implementing"); + expect(task!.activeRuntimeSelectionJson).toBe(JSON.stringify({ status: "implementing" })); + }); await pollAndProcess(); @@ -266,6 +273,8 @@ describe("coordinator", () => { expect(runReviewer).toHaveBeenCalledWith("task-impl", "/tmp/test"); const task = db.select().from(tasks).where(eq(tasks.id, "task-impl")).get(); expect(task!.status).toBe("done"); + expect(task!.activeRuntimeStatus).toBeNull(); + expect(task!.activeRuntimeSelectionJson).toBeNull(); }); it("should pick up review tasks and dispatch reviewer", async () => { diff --git a/packages/agent/src/__tests__/hooks.test.ts b/packages/agent/src/__tests__/hooks.test.ts index b71a1fe5..3f5b6c2f 100644 --- a/packages/agent/src/__tests__/hooks.test.ts +++ b/packages/agent/src/__tests__/hooks.test.ts @@ -68,6 +68,7 @@ function makeEnv(overrides: Record = {}) { AGENT_USE_SUBAGENTS: true, AGENT_FIRST_ACTIVITY_TIMEOUT_MS: 60_000, AIF_USAGE_LIMITS_ENABLED: false, + AIF_STAGE_RUNTIME_PIN_ENABLED: false, AIF_WARMUP_ENABLED: false, AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED: false, AIF_TASK_WORKTREES_ENABLED: false, diff --git a/packages/agent/src/__tests__/subagentQuery.test.ts b/packages/agent/src/__tests__/subagentQuery.test.ts index 7196e1ea..6290018c 100644 --- a/packages/agent/src/__tests__/subagentQuery.test.ts +++ b/packages/agent/src/__tests__/subagentQuery.test.ts @@ -152,6 +152,7 @@ const baseMockEnv = { AGENT_USE_SUBAGENTS: true, AGENT_FIRST_ACTIVITY_TIMEOUT_MS: 60_000, AIF_USAGE_LIMITS_ENABLED: true, + AIF_STAGE_RUNTIME_PIN_ENABLED: false, AIF_WARMUP_ENABLED: false, AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED: false, TELEGRAM_BOT_TOKEN: undefined, @@ -199,6 +200,9 @@ const { RuntimeExecutionError, createRuntimeWorkflowSpec } = await import("@aif/ const { executeSubagentQuery, resolveAdapterForTask } = await import("../subagentQuery.js"); beforeEach(() => { + for (const key of Object.keys(mockEnvOverrides)) { + delete mockEnvOverrides[key]; + } saveTaskActiveRuntimeSelectionMock.mockReset(); getTaskActiveRuntimeSelectionMock.mockReset(); getTaskActiveRuntimeSelectionMock.mockReturnValue(null); @@ -1447,7 +1451,48 @@ describe("executeSubagentQuery model fallback policy", () => { expect(callOptions.model).toBe("task-model"); }); - it("uses profile defaultModel when no task override", async () => { + it("skips active runtime pin lookup and persistence when the rollout flag is disabled", async () => { + findTaskByIdMock.mockReturnValue({ + id: "task-1", + projectId: "project-1", + status: "implementing", + runtimeOptionsJson: null, + modelOverride: null, + }); + getTaskActiveRuntimeSelectionMock.mockReturnValue({ + status: "implementing", + profileMode: "task", + source: "project_default", + profileId: "profile-old", + runtimeId: "claude", + providerId: "anthropic", + transport: "sdk", + model: "pinned-model", + baseUrl: null, + apiKeyEnvVar: "ANTHROPIC_API_KEY", + headers: {}, + options: { effort: "medium" }, + pinnedAt: "2026-05-13T00:00:00.000Z", + }); + queryMock.mockImplementation(makeDelayedSuccess(0, "ok")); + + await executeSubagentQuery({ + taskId: "task-1", + projectRoot: "/tmp/project", + agentName: "review-gate", + prompt: "check", + workflowKind: "review-gate", + }); + + const callOptions = queryMock.mock.calls[0][0].options as Record; + expect(callOptions.model).toBe("profile-model"); + expect(resolveEffectiveRuntimeProfileMock).toHaveBeenCalled(); + expect(getTaskActiveRuntimeSelectionMock).not.toHaveBeenCalled(); + expect(saveTaskActiveRuntimeSelectionMock).not.toHaveBeenCalled(); + }); + + it("persists active runtime selection when the rollout flag is enabled", async () => { + mockEnvOverrides.AIF_STAGE_RUNTIME_PIN_ENABLED = true; findTaskByIdMock.mockReturnValue({ id: "task-1", projectId: "project-1", @@ -1478,9 +1523,11 @@ describe("executeSubagentQuery model fallback policy", () => { model: "profile-model", }), ); + delete mockEnvOverrides.AIF_STAGE_RUNTIME_PIN_ENABLED; }); it("uses pinned runtime selection for retries in the same status and profile mode", async () => { + mockEnvOverrides.AIF_STAGE_RUNTIME_PIN_ENABLED = true; findTaskByIdMock.mockReturnValue({ id: "task-1", projectId: "project-1", @@ -1530,6 +1577,7 @@ describe("executeSubagentQuery model fallback policy", () => { expect(callOptions.effort).toBe("medium"); expect(resolveEffectiveRuntimeProfileMock).not.toHaveBeenCalled(); expect(saveTaskActiveRuntimeSelectionMock).not.toHaveBeenCalled(); + delete mockEnvOverrides.AIF_STAGE_RUNTIME_PIN_ENABLED; }); it("does not inject lightModel when no task override and no profile model", async () => { diff --git a/packages/agent/src/subagentQuery.ts b/packages/agent/src/subagentQuery.ts index b17ed3bb..a2d1c20c 100644 --- a/packages/agent/src/subagentQuery.ts +++ b/packages/agent/src/subagentQuery.ts @@ -542,9 +542,12 @@ async function resolveExecutionContext(options: SubagentQueryOptions): Promise<{ }> { const task = findTaskById(options.taskId); const profileMode = options.profileMode ?? "task"; + const env = getEnv(); const systemDefaultRuntimeProfileId = getAppDefaultRuntimeProfileId(profileMode); const workflow = buildWorkflowSpec(options); - const pinnedSelection = task ? getTaskActiveRuntimeSelection(options.taskId) : null; + const stageRuntimePinEnabled = env.AIF_STAGE_RUNTIME_PIN_ENABLED; + const pinnedSelection = + stageRuntimePinEnabled && task ? getTaskActiveRuntimeSelection(options.taskId) : null; const canUsePinnedSelection = pinnedSelection != null && task?.status != null && @@ -573,8 +576,8 @@ async function resolveExecutionContext(options: SubagentQueryOptions): Promise<{ modelOverride, suppressModelFallback, runtimeOptionsOverride, - fallbackRuntimeId: getEnv().AIF_DEFAULT_RUNTIME_ID, - fallbackProviderId: getEnv().AIF_DEFAULT_PROVIDER_ID, + fallbackRuntimeId: env.AIF_DEFAULT_RUNTIME_ID, + fallbackProviderId: env.AIF_DEFAULT_PROVIDER_ID, env: process.env, logger: { debug(context, message) { @@ -589,7 +592,7 @@ async function resolveExecutionContext(options: SubagentQueryOptions): Promise<{ }, }); - if (task?.status) { + if (stageRuntimePinEnabled && task?.status) { saveTaskActiveRuntimeSelection(options.taskId, { status: task.status, profileMode, diff --git a/packages/api/src/__tests__/tasks.test.ts b/packages/api/src/__tests__/tasks.test.ts index ba67a708..f1ed059c 100644 --- a/packages/api/src/__tests__/tasks.test.ts +++ b/packages/api/src/__tests__/tasks.test.ts @@ -1902,6 +1902,31 @@ describe("tasks API", () => { expect(body.lastHeartbeatAt).toBeTruthy(); }); + it("should clear active runtime selection on human status transitions", async () => { + const db = testDb.current; + db.insert(tasks) + .values({ + id: "ev-clear-runtime-pin", + projectId: "test-project", + title: "Clear runtime pin", + status: "done", + activeRuntimeStatus: "done", + activeRuntimeSelectionJson: JSON.stringify({ status: "done" }), + }) + .run(); + + const res = await app.request("/tasks/ev-clear-runtime-pin/events", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ event: "request_changes" }), + }); + + expect(res.status).toBe(200); + const task = db.select().from(tasks).where(eq(tasks.id, "ev-clear-runtime-pin")).get(); + expect(task!.activeRuntimeStatus).toBeNull(); + expect(task!.activeRuntimeSelectionJson).toBeNull(); + }); + it("should send plan_ready task back to planning on request_replanning", async () => { const db = testDb.current; db.insert(tasks) @@ -1935,6 +1960,8 @@ describe("tasks API", () => { status: "blocked_external", blockedFromStatus: "implementing", blockedReason: "rate limit", + activeRuntimeStatus: "implementing", + activeRuntimeSelectionJson: JSON.stringify({ status: "implementing" }), }) .run(); @@ -1950,6 +1977,9 @@ describe("tasks API", () => { expect(body.blockedFromStatus).toBeNull(); expect(body.blockedReason).toBeNull(); expect(body.retryAfter).toBeNull(); + const task = db.select().from(tasks).where(eq(tasks.id, "ev-3")).get(); + expect(task!.activeRuntimeStatus).toBe("implementing"); + expect(task!.activeRuntimeSelectionJson).toBe(JSON.stringify({ status: "implementing" })); }); it("should reject retry_from_blocked without blockedFromStatus", async () => { diff --git a/packages/shared/src/__tests__/env.test.ts b/packages/shared/src/__tests__/env.test.ts index eaa046ce..244d1e07 100644 --- a/packages/shared/src/__tests__/env.test.ts +++ b/packages/shared/src/__tests__/env.test.ts @@ -65,6 +65,7 @@ describe("env validation", () => { expect(result.AGENT_MAX_REVIEW_ITERATIONS).toBe(3); expect(result.AGENT_USE_SUBAGENTS).toBe(false); expect(result.AIF_WARMUP_ENABLED).toBe(false); + expect(result.AIF_STAGE_RUNTIME_PIN_ENABLED).toBe(false); expect(result.AIF_TASK_WORKTREES_ENABLED).toBe(false); expect(result.AIF_RUNTIME_SESSION_FORK_ENABLED).toBe(false); expect(result.AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED).toBe(false); @@ -82,16 +83,28 @@ describe("env validation", () => { const enabled = validateEnv({ AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED: "yes", AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED: "on", + AIF_STAGE_RUNTIME_PIN_ENABLED: "true", }); expect(enabled.AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED).toBe(true); expect(enabled.AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED).toBe(true); + expect(enabled.AIF_STAGE_RUNTIME_PIN_ENABLED).toBe(true); const disabled = validateEnv({ AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED: "no", AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED: "off", + AIF_STAGE_RUNTIME_PIN_ENABLED: "0", }); expect(disabled.AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED).toBe(false); expect(disabled.AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED).toBe(false); + expect(disabled.AIF_STAGE_RUNTIME_PIN_ENABLED).toBe(false); + }); + + it("should reject invalid stage runtime pin flag values", () => { + expect(() => + validateEnv({ + AIF_STAGE_RUNTIME_PIN_ENABLED: "maybe", + }), + ).toThrow(); }); it("should accept missing ANTHROPIC_API_KEY (uses ~/.claude/ auth)", () => { diff --git a/packages/shared/src/__tests__/turboEnv.test.ts b/packages/shared/src/__tests__/turboEnv.test.ts index 8f6f6f77..cefcc7fc 100644 --- a/packages/shared/src/__tests__/turboEnv.test.ts +++ b/packages/shared/src/__tests__/turboEnv.test.ts @@ -23,6 +23,7 @@ describe("Turbo environment passthrough", () => { "AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED", "AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED", "AIF_RUNTIME_SESSION_FORK_ENABLED", + "AIF_STAGE_RUNTIME_PIN_ENABLED", "AIF_TASK_WORKTREES_ENABLED", "AIF_USAGE_LIMITS_ENABLED", "AIF_WARMUP_ENABLED", diff --git a/packages/shared/src/env.ts b/packages/shared/src/env.ts index 906bdf78..2d7d32af 100644 --- a/packages/shared/src/env.ts +++ b/packages/shared/src/env.ts @@ -124,6 +124,16 @@ const envSchema = z.object({ return value; }, z.boolean()) .default(false), + AIF_STAGE_RUNTIME_PIN_ENABLED: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (BOOLEAN_TRUE_VALUES.has(normalized)) return true; + if (BOOLEAN_FALSE_VALUES.has(normalized)) return false; + } + return value; + }, z.boolean()) + .default(false), AIF_WARMUP_ENABLED: z .preprocess((value) => { if (typeof value === "string") { diff --git a/turbo.json b/turbo.json index 1fefe443..5e80bd9e 100644 --- a/turbo.json +++ b/turbo.json @@ -5,6 +5,7 @@ "AIF_RUNTIME_CODEX_NATIVE_SUBAGENTS_ENABLED", "AIF_RUNTIME_OPENCODE_LONG_RUNNING_DISPATCHER_ENABLED", "AIF_USAGE_LIMITS_ENABLED", + "AIF_STAGE_RUNTIME_PIN_ENABLED", "AIF_WARMUP_ENABLED", "AIF_TASK_WORKTREES_ENABLED", "AIF_ENABLE_CODEX_LOGIN_PROXY",