From 9c471e599ad33e5987c3d5182363a61b1266b5a8 Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Fri, 8 May 2026 00:04:30 +0200 Subject: [PATCH 1/6] QVAC-18181 feat[api]: introduce request lifecycle primitives with signal-based cancel Introduce DisposableScope, RequestContext, and RequestRegistry to formalize long-running request lifecycle management and route cancellation through AbortSignal semantics. Add client-generated requestId support on completion runs and a cancel({ requestId }) API, while preserving model-wide cancellation via registry-backed cancel({ modelId }). This removes legacy cancel-counter bookkeeping in the llama.cpp completion path so KV-cache accounting reflects actual signal-driven cancellation behavior. --- packages/sdk/client/api/cancel.ts | 47 +++- packages/sdk/client/api/completion-stream.ts | 36 +++ packages/sdk/examples/cancel-by-request-id.ts | 70 +++++ packages/sdk/schemas/cancel.ts | 41 +++ packages/sdk/schemas/completion-event.ts | 8 + packages/sdk/schemas/completion-stream.ts | 31 ++- packages/sdk/schemas/sdk-errors-server.ts | 12 + packages/sdk/scripts/run-unit-tests.ts | 18 +- packages/sdk/server/bare/ops/cancel.ts | 42 ++- .../ops/completion-stream.ts | 50 +++- .../llamacpp-completion/ops/kv-cache-state.ts | 55 +--- .../plugins/llamacpp-completion/plugin.ts | 31 ++- .../server/bare/runtime/disposable-scope.ts | 92 ++++++ packages/sdk/server/bare/runtime/index.ts | 22 ++ .../server/bare/runtime/request-context.ts | 66 +++++ .../sdk/server/bare/runtime/request-id.ts | 15 + .../runtime/request-registry-singleton.ts | 23 ++ .../server/bare/runtime/request-registry.ts | 253 +++++++++++++++++ .../sdk/server/rpc/handlers/cancelHandler.ts | 26 +- .../sdk/test/unit/kv-cache-cancel.test.ts | 210 ++------------ .../unit/runtime/disposable-scope.test.ts | 153 ++++++++++ .../unit/runtime/request-registry.test.ts | 263 ++++++++++++++++++ packages/sdk/types/bare-crypto/index.d.ts | 1 + packages/sdk/utils/errors-server.ts | 24 ++ 24 files changed, 1303 insertions(+), 286 deletions(-) create mode 100644 packages/sdk/examples/cancel-by-request-id.ts create mode 100644 packages/sdk/server/bare/runtime/disposable-scope.ts create mode 100644 packages/sdk/server/bare/runtime/index.ts create mode 100644 packages/sdk/server/bare/runtime/request-context.ts create mode 100644 packages/sdk/server/bare/runtime/request-id.ts create mode 100644 packages/sdk/server/bare/runtime/request-registry-singleton.ts create mode 100644 packages/sdk/server/bare/runtime/request-registry.ts create mode 100644 packages/sdk/test/unit/runtime/disposable-scope.test.ts create mode 100644 packages/sdk/test/unit/runtime/request-registry.test.ts diff --git a/packages/sdk/client/api/cancel.ts b/packages/sdk/client/api/cancel.ts index 74134e6ba3..5e86d31c45 100644 --- a/packages/sdk/client/api/cancel.ts +++ b/packages/sdk/client/api/cancel.ts @@ -1,21 +1,40 @@ import { send } from "@/client/rpc/rpc-client"; -import { type CancelParams, type CancelRequest } from "@/schemas"; +import { + type CancelClientInput, + type CancelParams, + type CancelRequest, +} from "@/schemas"; import { InvalidResponseError, CancelFailedError } from "@/utils/errors-client"; /** * Cancels an ongoing operation. * + * Two cancel paths are supported: + * + * - **By `requestId`** (introduced in 0.11.0, primary path) — pass the + * `requestId` exposed on the result of a long-running call (e.g. + * `(await completion({ ... })).requestId`) to cancel exactly that + * request. Either pass `{ requestId }` directly or the explicit + * `{ operation: "request", requestId }` form; both are equivalent. + * - **By `modelId`** (broad-cancel escape hatch, kept indefinitely) — + * `{ operation: "inference" | "embeddings", modelId }` cancels every + * in-flight request running on that model. Useful for model unload, + * app shutdown, or "cancel everything" admin paths where the caller + * doesn't have a `requestId` to hand. + * + * The download and RAG cancel paths are unchanged in 0.11.0; they still + * route through their own existing handlers. + * * @param params - The parameters for the cancellation - * @param params.operation - The type of operation to cancel ("inference", "downloadAsset", or "rag") - * @param params.modelId - The model ID (required for inference cancellation) - * @param params.downloadKey - The download key (required for download cancellation) - * @param params.clearCache - If true, deletes the partial download file (default: false) - * @param params.delegate - Delegation target for remote download cancellation (optional) - * @param params.workspace - The RAG workspace to cancel (optional, defaults to "default") * @throws {QvacErrorBase} When the response type is invalid or when the cancellation fails * * @example - * // Cancel inference + * // Cancel a specific completion by requestId (new in 0.11.0) + * const run = completion({ ... }); + * await cancel({ requestId: run.requestId }); + * + * @example + * // Broad-cancel every inference running on a model (escape hatch) * await cancel({ operation: "inference", modelId: "model-123" }); * * @example @@ -42,10 +61,11 @@ import { InvalidResponseError, CancelFailedError } from "@/utils/errors-client"; * // Cancel RAG operation on specific workspace * await cancel({ operation: "rag", workspace: "my-workspace" }); */ -export async function cancel(params: CancelParams) { +export async function cancel(params: CancelClientInput) { + const wireParams = normalizeCancelParams(params); const request: CancelRequest = { type: "cancel", - ...params, + ...wireParams, }; const response = await send(request); @@ -57,3 +77,10 @@ export async function cancel(params: CancelParams) { throw new CancelFailedError(response.error); } } + +function normalizeCancelParams(params: CancelClientInput): CancelParams { + if (!("operation" in params) && "requestId" in params) { + return { operation: "request", requestId: params.requestId }; + } + return params; +} diff --git a/packages/sdk/client/api/completion-stream.ts b/packages/sdk/client/api/completion-stream.ts index eac63fd1ab..18a52f0f26 100644 --- a/packages/sdk/client/api/completion-stream.ts +++ b/packages/sdk/client/api/completion-stream.ts @@ -129,6 +129,14 @@ type CompletionParams = Omit & { * ``` */ export function completion(params: CompletionParams): CompletionRun { + // Stable identity for this run, generated client-side so it's + // available synchronously the moment we return — before the first + // network round-trip and therefore before the user could possibly + // have a "stop" handler. Surfaced on the returned `CompletionRun` + // (`run.requestId`) so callers can `cancel({ requestId })` at any + // point during the stream. + const requestId = generateClientRequestId(); + let statsResolver: (value: CompletionStats | undefined) => void = () => {}; let statsRejecter: (error: unknown) => void = () => {}; const statsPromise = new Promise( @@ -226,6 +234,7 @@ export function completion(params: CompletionParams): CompletionRun { emitRawDeltas: params.emitRawDeltas, toolDialect: params.toolDialect, responseFormat: params.responseFormat, + requestId, }; const responses: AsyncGenerator = streamRpc( @@ -347,6 +356,7 @@ export function completion(params: CompletionParams): CompletionRun { })(); return { + requestId, events: eventStream, final: finalPromise, tokenStream, @@ -365,6 +375,7 @@ export function completion(params: CompletionParams): CompletionRun { })() as AsyncGenerator; return { + requestId, events: eventStream, final: finalPromise, tokenStream, @@ -375,3 +386,28 @@ export function completion(params: CompletionParams): CompletionRun { }; } } + +/** + * UUIDv4 generator for client-side request ids. The Web Crypto API ships + * `crypto.randomUUID` everywhere we run today (Bun, modern Node, modern + * browsers, React Native via the polyfill that the workbench-desktop / + * RN runtime config injects). The fallback exists so the SDK never + * crashes in an exotic JS environment without `crypto.randomUUID` — + * `requestId` semantics still hold (uniqueness, opaque to the caller), + * just without the UUIDv4 wire shape. + */ +function generateClientRequestId(): string { + const c = ( + globalThis as { + crypto?: { randomUUID?: () => string }; + } + ).crypto; + if (c?.randomUUID) return c.randomUUID(); + // Fallback: 128 random bits encoded as a hex string. Distinct enough + // for in-flight cancel targeting; not a wire-spec UUID. + const bytes = new Uint8Array(16); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = Math.floor(Math.random() * 256); + } + return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join(""); +} diff --git a/packages/sdk/examples/cancel-by-request-id.ts b/packages/sdk/examples/cancel-by-request-id.ts new file mode 100644 index 0000000000..507d75e86c --- /dev/null +++ b/packages/sdk/examples/cancel-by-request-id.ts @@ -0,0 +1,70 @@ +/** + * Cancel a specific in-flight completion by `requestId`. + * + * `completion(...)` exposes a stable `requestId` (UUIDv4, generated + * client-side) on the returned `CompletionRun`. Pass it to + * `cancel({ requestId })` to abort that exact run without affecting any + * other inference happening on the same model. + * + * Two cancel paths exist: + * + * 1. `cancel({ requestId })` — targeted cancel, the primary path + * introduced in 0.11.0. Available as soon as `completion()` returns, + * before the first network round-trip — so a user clicking "stop" + * during the queueing phase still hits the right run. + * 2. `cancel({ operation: "inference", modelId })` — broad cancel + * (escape hatch, kept indefinitely). Cancels every inference running + * on the model. Useful for unload, app shutdown, admin sweeps when + * the caller doesn't have a `requestId` to hand. + */ + +import { + cancel, + completion, + loadModel, + unloadModel, + QWEN3_600M_INST_Q4, +} from "@qvac/sdk"; + +try { + const modelId = await loadModel({ + modelSrc: QWEN3_600M_INST_Q4, + modelType: "llm", + modelConfig: { ctx_size: 4096 }, + }); + + const run = completion({ + modelId, + history: [ + { + role: "user", + content: + "Write a long, detailed essay about the history of the Roman Empire.", + }, + ], + stream: true, + }); + + console.log(`requestId: ${run.requestId}`); + + // Cancel after a short delay so we exercise the cancel-mid-decode path. + setTimeout(() => { + void cancel({ requestId: run.requestId }); + console.log("(cancel issued)"); + }, 250); + + let tokenCount = 0; + for await (const event of run.events) { + if (event.type === "contentDelta") { + tokenCount++; + process.stdout.write(event.text); + } + } + console.log(`\n\nstreamed ${tokenCount} content deltas before cancel.`); + + await unloadModel({ modelId }); + process.exit(0); +} catch (error) { + console.error("Error:", error); + process.exit(1); +} diff --git a/packages/sdk/schemas/cancel.ts b/packages/sdk/schemas/cancel.ts index 50970ee2f5..4632efeebd 100644 --- a/packages/sdk/schemas/cancel.ts +++ b/packages/sdk/schemas/cancel.ts @@ -32,11 +32,32 @@ const cancelEmbeddingsParamsSchema = cancelInferenceBaseSchema.extend({ operation: z.literal("embeddings").describe("Operation type"), }); +/** + * Targeted cancel by `requestId` — the primary cancel path introduced in + * SDK 0.11.0. Pair with the `requestId` field exposed on `CompletionRun` + * (and equivalent long-running result objects) to cancel a specific + * in-flight request rather than every request running on a given model. + * + * The pre-existing `{ operation: "inference", modelId }` form is kept as + * a broad-cancel escape hatch for "cancel everything on this model" + * scenarios (model unload, app shutdown, admin sweeps). + */ +const cancelByRequestIdParamsSchema = z.object({ + operation: z.literal("request").describe("Operation type"), + requestId: z + .string() + .min(1) + .describe( + "Identifier of the specific in-flight request to cancel — the value exposed on the result object returned by long-running calls (e.g. `completion(...)`).", + ), +}); + const cancelParamsSchema = z.discriminatedUnion("operation", [ cancelInferenceParamsSchema, cancelDownloadParamsSchema, cancelRagParamsSchema, cancelEmbeddingsParamsSchema, + cancelByRequestIdParamsSchema, ]); export const cancelRequestSchema = z.intersection( @@ -50,9 +71,29 @@ export const cancelResponseSchema = z.object({ error: z.string().optional(), }); +/** + * Sugar for the most common new path — `cancel({ requestId })`. The client + * accepts either this shape (no `operation`) or the explicit + * `{ operation: "request", requestId }` and normalises before sending. + */ +export const cancelByRequestIdSugarSchema = z + .object({ + requestId: z.string().min(1), + }) + .strict(); + export type CancelParams = z.infer; export type CancelInferenceBaseParams = z.infer< typeof cancelInferenceBaseSchema >; +export type CancelByRequestIdParams = z.infer< + typeof cancelByRequestIdParamsSchema +>; +export type CancelByRequestIdSugar = z.infer< + typeof cancelByRequestIdSugarSchema +>; export type CancelRequest = z.infer; export type CancelResponse = z.infer; + +/** Public client-API input — accepts the wire union *or* the requestId sugar. */ +export type CancelClientInput = CancelParams | CancelByRequestIdSugar; diff --git a/packages/sdk/schemas/completion-event.ts b/packages/sdk/schemas/completion-event.ts index 8eb4f0bcd0..21b7e09cf7 100644 --- a/packages/sdk/schemas/completion-event.ts +++ b/packages/sdk/schemas/completion-event.ts @@ -133,6 +133,14 @@ export type CompletionFinal = { }; export type CompletionRun = { + /** + * Stable identifier for this run, generated client-side at call time + * (UUIDv4) and available synchronously the moment `completion(...)` + * returns — before the first network round-trip. Pass it to + * `cancel({ requestId })` to target this specific request without + * affecting any other inference running on the same model. + */ + requestId: string; /** Ordered stream of typed completion events — the canonical consumption API. */ events: AsyncIterable; /** Resolves when the stream ends with aggregated content, thinking, tool calls, stats, and raw output. */ diff --git a/packages/sdk/schemas/completion-stream.ts b/packages/sdk/schemas/completion-stream.ts index 7f4b6fa1bb..72839c1999 100644 --- a/packages/sdk/schemas/completion-stream.ts +++ b/packages/sdk/schemas/completion-stream.ts @@ -2,7 +2,10 @@ import { z } from "zod"; import { toolSchema } from "./tools"; import { completionEventSchema } from "./completion-event"; -export { completionStatsSchema, type CompletionStats } from "./completion-event"; +export { + completionStatsSchema, + type CompletionStats, +} from "./completion-event"; /** * Tool-call output dialect. Auto-detected from the model name; pass via @@ -14,7 +17,12 @@ export { completionStatsSchema, type CompletionStats } from "./completion-event" * - `"json"`: `{"name":"get_weather","arguments":{"city":"Tokyo"}}` or `{"tool_calls":[{"name":"...","arguments":{...}}]}` * - `"harmony"`: `<|channel|>commentary to=functions.get_weather <|constrain|>json<|message|>{"city":"Tokyo"}<|call|>` */ -export const toolDialectSchema = z.enum(["hermes", "pythonic", "json", "harmony"]); +export const toolDialectSchema = z.enum([ + "hermes", + "pythonic", + "json", + "harmony", +]); export const attachmentSchema = z.object({ path: z @@ -49,10 +57,7 @@ export const generationParamsSchema = z .describe( "Max tokens to predict. `-1` = until stop token, `-2` = until context filled.", ), - seed: z - .number() - .optional() - .describe("Random seed for reproducibility."), + seed: z.number().optional().describe("Random seed for reproducibility."), frequency_penalty: z .number() .optional() @@ -78,7 +83,12 @@ export const responseFormatSchema = z.discriminatedUnion("type", [ type: z.literal("json_schema"), json_schema: z .object({ - name: z.string().min(1).describe("Schema identifier; OpenAI-compatibility only — not used by the addon."), + name: z + .string() + .min(1) + .describe( + "Schema identifier; OpenAI-compatibility only — not used by the addon.", + ), description: z .string() .optional() @@ -165,6 +175,13 @@ const completionClientParamsBaseSchema = completionParamsSchema.extend({ .describe( "Optional structured-output constraint: `text` (default, free-form), `json_object` (any valid JSON), or `json_schema` (output conforms to the provided JSON Schema). Mutually exclusive with `tools`.", ), + requestId: z + .string() + .min(1) + .optional() + .describe( + "Stable identifier for this in-flight request, generated by the client at call time (UUIDv4). Surfaced on the `CompletionRun` result so callers can target it with `cancel({ requestId })`. Optional on the wire so legacy clients keep working — the server falls back to a server-generated id when the field is missing.", + ), }); function refineNoToolsWithStructuredOutput( diff --git a/packages/sdk/schemas/sdk-errors-server.ts b/packages/sdk/schemas/sdk-errors-server.ts index 5833f00b12..f39d1edd5f 100644 --- a/packages/sdk/schemas/sdk-errors-server.ts +++ b/packages/sdk/schemas/sdk-errors-server.ts @@ -38,6 +38,8 @@ export const SDK_SERVER_ERROR_CODES = { INVALID_IMAGE_INPUT: 52414, TEXT_TO_SPEECH_STREAM_FAILED: 52415, MODEL_OPERATION_NOT_SUPPORTED: 52416, + REQUEST_ID_CONFLICT: 52417, + REQUEST_NOT_FOUND: 52418, // RAG Operations (52,800-52,999) RAG_SAVE_FAILED: 52800, @@ -284,6 +286,16 @@ const serverErrorDefinitions: ErrorCodesMap = { return `Model "${modelId}" (type: ${modelType}) does not support ${operation}.${supportedClause}${suggestionClause}`; }, }, + [SDK_SERVER_ERROR_CODES.REQUEST_ID_CONFLICT]: { + name: "REQUEST_ID_CONFLICT", + message: (requestId: string) => + `Request id "${requestId}" is already in flight; refusing to overwrite the existing context`, + }, + [SDK_SERVER_ERROR_CODES.REQUEST_NOT_FOUND]: { + name: "REQUEST_NOT_FOUND", + message: (requestId: string) => + `No in-flight request with id "${requestId}"`, + }, // RAG Operations (52,800-52,999) [SDK_SERVER_ERROR_CODES.RAG_SAVE_FAILED]: { diff --git a/packages/sdk/scripts/run-unit-tests.ts b/packages/sdk/scripts/run-unit-tests.ts index 949c25c762..af507a20ff 100644 --- a/packages/sdk/scripts/run-unit-tests.ts +++ b/packages/sdk/scripts/run-unit-tests.ts @@ -5,12 +5,26 @@ import { fileURLToPath } from "url"; const __dirname = dirname(fileURLToPath(import.meta.url)); const testDir = join(__dirname, "..", "test", "unit"); -const testFiles = readdirSync(testDir).filter((f) => f.endsWith(".test.ts")); + +function collectTestFiles(dir: string): string[] { + const files: string[] = []; + for (const entry of readdirSync(dir, { withFileTypes: true })) { + const fullPath = join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...collectTestFiles(fullPath)); + } else if (entry.isFile() && entry.name.endsWith(".test.ts")) { + files.push(fullPath); + } + } + return files; +} + +const testFiles = collectTestFiles(testDir); let hasFailure = false; for (const file of testFiles) { - const result = spawnSync("bun", ["run", join(testDir, file)], { + const result = spawnSync("bun", ["run", file], { stdio: "inherit", }); if (result.status !== 0) { diff --git a/packages/sdk/server/bare/ops/cancel.ts b/packages/sdk/server/bare/ops/cancel.ts index 2d58f05072..6cd7fe61c9 100644 --- a/packages/sdk/server/bare/ops/cancel.ts +++ b/packages/sdk/server/bare/ops/cancel.ts @@ -4,9 +4,27 @@ import { cancelInferenceBaseSchema, } from "@/schemas"; import { ModelNotLoadedError } from "@/utils/errors-server"; -import { noteCancelRequested } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; +import { getRequestRegistry } from "@/server/bare/runtime"; +import type { RequestKind } from "@/server/bare/runtime"; +import { getServerLogger } from "@/logging"; -export async function cancel(params: CancelInferenceBaseParams) { +const logger = getServerLogger(); + +/** + * Broad cancel: abort every in-flight request matching `modelId` (and + * optionally a `kind`). Maps onto `RequestRegistry.cancel({ modelId })` + * — the registry walks active contexts and aborts each one's signal, + * which the inference handler has wired to the addon's `cancel()`. + * + * Kept as a stable surface alongside the new `cancel({ requestId })` + * path: the caller may not have a `requestId` to hand (model unload, + * app shutdown, admin sweeps), and the escape hatch is cheap because + * the registry already does the matching. + */ +export function cancel( + params: CancelInferenceBaseParams, + opts?: { kind?: RequestKind }, +) { const { modelId } = cancelInferenceBaseSchema.parse(params); const model = getModel(modelId); @@ -14,14 +32,18 @@ export async function cancel(params: CancelInferenceBaseParams) { throw new ModelNotLoadedError(modelId); } - // Must be recorded *before* `addon.cancel()` so the in-flight - // `completion()` for this model sees the bumped counter when it - // snapshots after `processModelResponse` returns. This is the signal - // that tells `completion()` not to record a `savedCount` for the - // kv-cache on a cancelled turn. - noteCancelRequested(modelId); + const registry = getRequestRegistry(); + const target = opts?.kind + ? { modelId, kind: opts.kind } + : { modelId }; + const cancelled = registry.cancel(target); - if (model.addon && model.addon.cancel) { - await model.addon.cancel(); + // No active request to cancel is not a hard error — callers (workbench + // "Stop" button, app shutdown sweeps) often fire-and-forget. Log so + // operators can see when a cancel landed against an empty registry. + if (cancelled === 0) { + logger.debug( + `[cancel] no in-flight request matched modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""}`, + ); } } diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index 517b8025c1..bbb1bd33cc 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -37,9 +37,6 @@ import { cachedMessageCounts, clearCachedMessageCounts as clearCachedMessageCountsFromState, decideCachedHistorySlice, - noteCancelRequested as noteCancelRequestedFromState, - shouldRecordSavedCount, - snapshotCancelCount, } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; import { appendToolsToHistory, @@ -117,7 +114,26 @@ type CompletionRunOptions = Pick< export function clearCachedMessageCounts(prefix?: string): void { clearCachedMessageCountsFromState(prefix, path.sep); } -export const noteCancelRequested = noteCancelRequestedFromState; + +/** + * Decide whether a completed turn earned the right to record its kv-cache + * boundary. A `savedCount` is only safe to write when the turn ran to + * completion AND produced at least one token — anything else (cancelled + * mid-decode, zero-token reply, early EOS) leaves the on-disk cache file + * in an unknown state relative to `history.length + 1`, and a stale entry + * would slice the next turn's history down to an empty payload. + * + * Replaces the pre-0.11.0 `shouldRecordSavedCount(wasCancelled, ...)` with + * a signal-driven check that reads directly from the request's + * `AbortSignal`. The local helper keeps both call sites in + * `completion-stream.ts` honest without importing the registry every time. + */ +function shouldRecordSavedCount( + signal: AbortSignal, + producedTokens: boolean, +): boolean { + return !signal.aborted && producedTokens; +} // Verify the addon actually persisted the cache file before recording its // message count. The addon currently swallows write errors silently, so a @@ -434,9 +450,11 @@ export async function* completion( toolDialect?: ToolDialect; responseFormat?: ResponseFormat; }, + opts: { signal: AbortSignal }, ): AsyncGenerator<{ token: string }, CompletionResult, unknown> { const { history, modelId, kvCache, tools, generationParams, responseFormat } = params; + const { signal } = opts; const modelConfig = getModelConfig(modelId); const toolsEnabled = (modelConfig as { tools?: boolean }).tools === true; @@ -471,6 +489,22 @@ export async function* completion( const model = getModel(modelId); + // Hard-cancel wiring: when the registry aborts the request's signal, + // forward to the addon so the C++ work stops as soon as it can. The + // SDK still treats `signal.aborted` as the truth for cancel detection + // (post-completion bookkeeping below) — this listener only shortens + // the latency between "user clicked stop" and "addon stops decoding". + // Best-effort fire-and-forget: the addon's cancel resolves quickly and + // we can't await it from inside an event listener; the iterator below + // will see EOF/empty tokens once the C++ side returns. + const onAbort = () => { + const addon = model.addon; + if (addon?.cancel) { + void addon.cancel.call(addon); + } + }; + signal.addEventListener("abort", onAbort, { once: true }); + if (kvCache) { const systemPromptFromHistory = extractSystemPrompt(history); // Dynamic mode lets each turn carry its own tool set, so the cache @@ -516,7 +550,6 @@ export async function* completion( ); logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - const cancelCountBefore = snapshotCancelCount(modelId); const result = yield* processModelResponse( model, messagesToSend, @@ -525,9 +558,8 @@ export async function* completion( { cacheKey: cachePathToUse, saveCacheToDisk: true }, dialect, ); - const wasCancelled = snapshotCancelCount(modelId) > cancelCountBefore; - if (shouldRecordSavedCount(wasCancelled, result.producedTokens)) { + if (shouldRecordSavedCount(signal, result.producedTokens)) { // Turn ran to completion and produced content — record the new // boundary so the next turn can slice its history. await recordCacheSaveCount(cachePathToUse, history.length + 1); @@ -603,7 +635,6 @@ export async function* completion( ); logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - const cancelCountBefore = snapshotCancelCount(modelId); const result = yield* processModelResponse( model, messagesToSend, @@ -612,7 +643,6 @@ export async function* completion( { cacheKey: cachePathToUse, saveCacheToDisk: true }, dialect, ); - const wasCancelled = snapshotCancelCount(modelId) > cancelCountBefore; // TODO: support auto-cache for tool-call turns by keying off the // structured assistant/tool messages callers push into history, @@ -640,7 +670,7 @@ export async function* completion( // which is empty/partial in those cases, and the on-disk cache the // addon wrote is not aligned with the current-history hash. Treat it // like the tool-call branch — drop the cache file and clear the count. - if (!shouldRecordSavedCount(wasCancelled, result.producedTokens)) { + if (!shouldRecordSavedCount(signal, result.producedTokens)) { try { await fsPromises.unlink(cachePathToUse); } catch (unlinkError) { diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts index f0ff5bac6c..335423da5d 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts @@ -7,6 +7,14 @@ * in the Bare runtime (which is not available in that environment). * The file-system-dependent pieces (e.g. `recordCacheSaveCount`) live in * `completion-stream.ts` and consume the state exported here. + * + * History note (QVAC-18181, SDK 0.11.0): this module previously also + * carried a per-model cancel counter (`modelCancelCounters` / + * `noteCancelRequested` / `snapshotCancelCount`) used by `completion()` + * to detect mid-decode cancellation. That side channel was retired + * alongside the introduction of `RequestRegistry` — the in-flight + * `RequestContext.signal` is now the single source of truth and + * `completion-stream.ts` reads `signal.aborted` directly. */ /** @@ -37,15 +45,6 @@ */ export const cachedMessageCounts = new Map(); -/** - * Monotonic counter of cancel requests per model. `completion()` snapshots - * the value before running the model and compares afterward to decide - * whether the turn was cancelled mid-decode. A counter (not a boolean) is - * used deliberately: a cancel that lands near a turn boundary cannot bleed - * into the next turn because we always compare snapshots, not values. - */ -const modelCancelCounters = new Map(); - /** * Clear bookkeeping entries. With no argument, clears the whole map. With a * `prefix`, removes any entry whose path is equal to it OR sits beneath it @@ -73,44 +72,6 @@ export function clearCachedMessageCounts(prefix?: string, sep = "/"): void { } } -/** - * Called from `server/bare/ops/cancel.ts` right before `addon.cancel()` so - * that the in-flight `completion()` for this model can detect that its - * current turn is being cancelled and skip poisoning `cachedMessageCounts` - * with a `history.length + 1` entry that does not correspond to a real - * assistant reply. - */ -export function noteCancelRequested(modelId: string): void { - modelCancelCounters.set( - modelId, - (modelCancelCounters.get(modelId) ?? 0) + 1, - ); -} - -export function snapshotCancelCount(modelId: string): number { - return modelCancelCounters.get(modelId) ?? 0; -} - -/** Test-only. */ -export function _resetCancelCountersForTest(): void { - modelCancelCounters.clear(); -} - -/** - * A completion's `savedCount` should only be recorded when the turn ran to - * completion AND produced at least one token. Any other outcome — - * cancelled mid-decode, legitimate zero-token reply, or an early EOS — must - * clear the entry instead, because there is no guarantee that the kv-cache - * file on disk matches the `history.length + 1` boundary the SDK would - * otherwise record. - */ -export function shouldRecordSavedCount( - wasCancelled: boolean, - producedTokens: boolean, -): boolean { - return !wasCancelled && producedTokens; -} - export interface HistoryMessage { role: string; content: string; diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts index ef0b60b7f3..89d36b7ea0 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts @@ -30,6 +30,8 @@ import { attachModelExecutionMs } from "@/profiling/model-execution"; import { getModelConfig } from "@/server/bare/registry/model-registry"; import { createCompletionNormalizer } from "@/server/utils/completion-normalizer"; import { detectToolDialect } from "@/server/utils/tool-integration"; +import { getRequestRegistry } from "@/server/bare/runtime"; +import { generateServerRequestId } from "@/server/bare/runtime/request-id"; function transformLlmConfig(llmConfig: LlmConfig) { const transformed = JSON.parse( @@ -165,16 +167,31 @@ export const llmPlugin = definePlugin({ toolDialect: dialect, }); - const stream = completion({ - history: filteredHistory, + // Open a request-scoped lifecycle. The registry is the single + // source of truth for "is this turn cancelled?" — we plumb the + // signal into `completion()` and expose `requestId` so the + // client can target this run with `cancel({ requestId })`. + // Falls back to a server-generated id if the client (e.g. an + // older release) didn't send one. + await using ctx = getRequestRegistry().begin({ + requestId: request.requestId ?? generateServerRequestId(), + kind: "completion", modelId: request.modelId, - kvCache: request.kvCache, - ...(toolsActive && request.tools && { tools: request.tools }), - ...(request.generationParams && { generationParams: request.generationParams }), - ...(toolsActive && { toolDialect: dialect }), - ...(request.responseFormat && { responseFormat: request.responseFormat }), }); + const stream = completion( + { + history: filteredHistory, + modelId: request.modelId, + kvCache: request.kvCache, + ...(toolsActive && request.tools && { tools: request.tools }), + ...(request.generationParams && { generationParams: request.generationParams }), + ...(toolsActive && { toolDialect: dialect }), + ...(request.responseFormat && { responseFormat: request.responseFormat }), + }, + { signal: ctx.signal }, + ); + try { const batchedEvents: CompletionEvent[] = []; let result = await stream.next(); diff --git a/packages/sdk/server/bare/runtime/disposable-scope.ts b/packages/sdk/server/bare/runtime/disposable-scope.ts new file mode 100644 index 0000000000..3d6ee9f6c7 --- /dev/null +++ b/packages/sdk/server/bare/runtime/disposable-scope.ts @@ -0,0 +1,92 @@ +/** + * Bounded-lifetime cleanup scope for an in-flight request. + * + * Callers register cleanup callbacks via `defer(...)` while a handler runs. + * On `[Symbol.asyncDispose]()` (used implicitly by `await using` or + * explicitly) every registered callback is awaited in **LIFO** order — the + * mirror of how `try/finally` blocks would unwind in a sequential write-up + * of the same handler. + * + * Guarantees: + * + * - **Idempotent.** A scope can be disposed once. Subsequent disposes are + * no-ops; subsequent `defer` calls run the cleanup eagerly so callers + * never silently leak a deferred resource if they hand a deferred + * callback to an already-disposed scope. + * - **Error aggregation.** Every deferred cleanup runs even when an + * earlier one throws. If a single cleanup throws, that error is + * rethrown verbatim. If two or more throw, an `AggregateError` is + * rethrown carrying every failure in the order it occurred. The + * handler unwinding the scope can therefore see _all_ cleanup + * failures, not just the first one. + */ + +export interface DisposableScope { + /** + * Register a cleanup callback. Cleanups run on dispose in LIFO order. + * Calling `defer` after the scope has been disposed runs the cleanup + * eagerly so resources never leak silently. + */ + defer(cleanup: () => Promise | void): void; + /** Run all registered cleanups. Idempotent. */ + [Symbol.asyncDispose](): Promise; + /** True after the first dispose has been initiated. */ + readonly disposed: boolean; +} + +export function createDisposableScope(): DisposableScope { + const cleanups: Array<() => Promise | void> = []; + let disposed = false; + let disposing = false; + + function defer(cleanup: () => Promise | void) { + if (disposed || disposing) { + void runEagerly(cleanup); + return; + } + cleanups.push(cleanup); + } + + async function runEagerly(cleanup: () => Promise | void) { + try { + await cleanup(); + } catch { + // Late-deferred cleanups have no caller waiting on them; the + // unwinding path that disposed the scope already returned its own + // errors. Swallowing here matches the user's intent of registering + // a fire-and-forget cleanup. + } + } + + async function dispose() { + if (disposed || disposing) return; + disposing = true; + const errors: unknown[] = []; + while (cleanups.length > 0) { + const cleanup = cleanups.pop(); + if (!cleanup) continue; + try { + await cleanup(); + } catch (err) { + errors.push(err); + } + } + disposed = true; + disposing = false; + if (errors.length === 1) throw errors[0]; + if (errors.length > 1) { + throw new AggregateError( + errors, + `DisposableScope: ${errors.length} cleanups failed`, + ); + } + } + + return { + defer, + [Symbol.asyncDispose]: dispose, + get disposed() { + return disposed; + }, + }; +} diff --git a/packages/sdk/server/bare/runtime/index.ts b/packages/sdk/server/bare/runtime/index.ts new file mode 100644 index 0000000000..a84d2777f4 --- /dev/null +++ b/packages/sdk/server/bare/runtime/index.ts @@ -0,0 +1,22 @@ +export { createDisposableScope } from "@/server/bare/runtime/disposable-scope"; +export type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; + +export type { + RequestContext, + RequestKind, + RequestState, +} from "@/server/bare/runtime/request-context"; + +export { + createRequestRegistry, + getRequestRegistry, +} from "@/server/bare/runtime/request-registry-singleton"; +export type { + BeginOpts, + CancelByModelId, + CancelByRequestId, + CancelTarget, + ManagedRequestContext, + RequestOutcome, + RequestRegistry, +} from "@/server/bare/runtime/request-registry"; diff --git a/packages/sdk/server/bare/runtime/request-context.ts b/packages/sdk/server/bare/runtime/request-context.ts new file mode 100644 index 0000000000..cde61b45f4 --- /dev/null +++ b/packages/sdk/server/bare/runtime/request-context.ts @@ -0,0 +1,66 @@ +import type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; + +/** + * Coarse classification of a long-running request. Used by + * `RequestRegistry.cancel({ modelId, kind })` so a broad cancel can target + * just one operation kind on a given model (e.g. cancel an in-flight + * completion without touching a finetune running on the same model). + * + * The set is intentionally open-coded — adding a new kind is a one-line + * change and the union surfaces in editor autocomplete at every call site. + */ +export type RequestKind = + | "completion" + | "embeddings" + | "transcribe" + | "translate" + | "diffusion" + | "tts" + | "ocr" + | "finetune" + | "loadModel" + | "downloadAsset" + | "rag"; + +/** + * Lifecycle states a request transitions through. A new context starts in + * `"running"`. `cancel(...)` flips it to `"cancelling"` and aborts the + * signal; `end({ outcome: "completed" | "failed" | "cancelled" })` flips + * it to a terminal state and removes it from the registry. + * + * Kept as a string union (not a state machine) on purpose — handlers read + * `state` defensively at most a couple of points and a flat enum is easier + * to log/assert than a transition table. + */ +export type RequestState = + | "running" + | "cancelling" + | "completed" + | "failed" + | "cancelled"; + +/** + * Per-request lifecycle handle. Created by `RequestRegistry.begin(...)` + * and consumed by long-running handlers as the single owner of: + * + * - `requestId` — stable identity; visible to the client so it can + * target this exact request with `cancel({ requestId })`. + * - `signal` — `AbortSignal` that fires when the request is cancelled. + * Composes with addon-level cancellation through a single + * `signal.addEventListener("abort", ...)` hook installed by the + * handler. + * - `scope` — `DisposableScope` for `await using` / `Symbol.asyncDispose` + * cleanup. The scope unwinds whether the handler returns, throws, or + * is cancelled — there is no manual cleanup path for handlers to + * forget on the cancel branch. + * - `state` — current lifecycle state. Treat as read-mostly; the + * registry mutates it. + */ +export interface RequestContext { + readonly requestId: string; + readonly kind: RequestKind; + readonly modelId: string | undefined; + readonly signal: AbortSignal; + readonly scope: DisposableScope; + state: RequestState; +} diff --git a/packages/sdk/server/bare/runtime/request-id.ts b/packages/sdk/server/bare/runtime/request-id.ts new file mode 100644 index 0000000000..fb38de732f --- /dev/null +++ b/packages/sdk/server/bare/runtime/request-id.ts @@ -0,0 +1,15 @@ +import { randomUUID } from "bare-crypto"; + +/** + * Server-side fallback for `requestId`. The new wire contract is that + * the client generates the id (UUIDv4) at call time so it's available + * synchronously before the first network round-trip. To keep older + * clients working, the request schema marks `requestId` optional and the + * server fills it in here when it's missing. + * + * `bare-crypto.randomUUID()` mirrors Node's `crypto.randomUUID()` and is + * Bare-runtime safe. Returns a v4 UUID. + */ +export function generateServerRequestId(): string { + return randomUUID(); +} diff --git a/packages/sdk/server/bare/runtime/request-registry-singleton.ts b/packages/sdk/server/bare/runtime/request-registry-singleton.ts new file mode 100644 index 0000000000..ecfc657f36 --- /dev/null +++ b/packages/sdk/server/bare/runtime/request-registry-singleton.ts @@ -0,0 +1,23 @@ +import { + createRequestRegistry as createRegistry, + type RequestRegistry, +} from "@/server/bare/runtime/request-registry"; + +/** + * Worker-process singleton. Every long-running request in this Bare + * worker registers under this registry, so a `cancel({ requestId })` RPC + * can find its target without the caller needing to know which plugin / + * handler owns the request. + * + * Exposed alongside `createRequestRegistry()` rather than replacing it so + * unit tests can spin up isolated registries without contaminating the + * shared instance. + */ +let registry: RequestRegistry | null = null; + +export function getRequestRegistry(): RequestRegistry { + if (!registry) registry = createRegistry(); + return registry; +} + +export { createRegistry as createRequestRegistry }; diff --git a/packages/sdk/server/bare/runtime/request-registry.ts b/packages/sdk/server/bare/runtime/request-registry.ts new file mode 100644 index 0000000000..f4329b9adb --- /dev/null +++ b/packages/sdk/server/bare/runtime/request-registry.ts @@ -0,0 +1,253 @@ +import { + createDisposableScope, + type DisposableScope, +} from "@/server/bare/runtime/disposable-scope"; +import type { + RequestContext, + RequestKind, + RequestState, +} from "@/server/bare/runtime/request-context"; +import { RequestIdConflictError } from "@/utils/errors-server"; + +/** + * Outcome the caller declares when terminating a request through + * `registry.end(...)`. The registry maps it to a terminal `RequestState` + * before disposing the scope so observers see a coherent final state. + */ +export type RequestOutcome = "completed" | "failed" | "cancelled"; + +export interface BeginOpts { + /** Stable identity. Caller-provided so the client and server agree. */ + requestId: string; + kind: RequestKind; + modelId?: string; + /** + * Optional parent abort signal — typically the worker-level "shutdown" + * signal. When the parent aborts, the request's own signal aborts too. + * Composes through a `addEventListener("abort", ...)` hook so cancelling + * the parent does not require iterating the registry. + */ + parentSignal?: AbortSignal; +} + +export interface CancelByRequestId { + requestId: string; + reason?: string; +} + +export interface CancelByModelId { + modelId: string; + kind?: RequestKind; + reason?: string; +} + +export type CancelTarget = CancelByRequestId | CancelByModelId; + +/** + * `ManagedRequestContext` is the value `begin(...)` returns. It extends + * `RequestContext` with an async-dispose method so handlers can write: + * + * await using ctx = registry.begin({ ... }); + * + * On dispose the scope unwinds (LIFO cleanup) and the registry slot is + * freed. If the handler doesn't override `ctx.state` before unwinding, + * the registry derives the terminal state from `signal.aborted` — + * `"cancelled"` when an abort was recorded, `"completed"` otherwise. + */ +export interface ManagedRequestContext extends RequestContext { + [Symbol.asyncDispose](): Promise; +} + +export interface RequestRegistry { + /** + * Open a new request. Throws `RequestIdConflictError` if `requestId` is + * already present (UUIDv4 collision is astronomically unlikely; the + * guard exists so a buggy client retry sending the same id can't + * silently overwrite an in-flight request). + */ + begin(opts: BeginOpts): ManagedRequestContext; + + /** Look up an in-flight request by id. */ + get(requestId: string): RequestContext | null; + + /** + * Snapshot of currently-tracked requests. Useful for diagnostics / + * structured logs ("which requests are in flight right now?"). Returns + * a fresh array; mutations on it are not observed by the registry. + */ + list(): RequestContext[]; + + /** + * Cancel matching requests. Returns the number of contexts whose abort + * was triggered by *this* call (already-cancelled contexts are skipped + * so callers can rely on the count to log "n requests cancelled" once). + * + * For `{ modelId }` and an optional `kind`, cancels every active + * request that matches the predicate. This is the broad-cancel path + * the pre-registry `cancel({ modelId })` API maps to. + */ + cancel(target: CancelTarget): number; + + /** + * Cancel every active request — the worker-shutdown / model-unload + * sweep. The reason is forwarded to each request as the abort reason + * so handler logs can distinguish a normal cancel from a sweep. + * Resolves once all targeted contexts have flipped to `"cancelling"`; + * scope unwinding still happens on each handler's own dispose path. + */ + cancelAll(reason: "shutdown" | "modelUnload"): Promise; + + /** + * Mark a request finished and dispose its scope. Equivalent to + * `await ctx[Symbol.asyncDispose]()` with an explicit outcome. + * Idempotent — calling `end` after a scope dispose is a no-op. + */ + end(requestId: string, outcome: RequestOutcome): Promise; +} + +interface RegistryEntry { + ctx: RequestContext; + controller: AbortController; + scope: DisposableScope; + /** + * Cleanup hook removed from `parentSignal` after the request ends, so + * a long-lived shutdown signal doesn't accumulate per-request listeners + * for the lifetime of the worker. + */ + detachParent: () => void; +} + +export function createRequestRegistry(): RequestRegistry { + const entries = new Map(); + + function cancelEntry(entry: RegistryEntry, reason?: string): boolean { + if (entry.controller.signal.aborted) return false; + entry.ctx.state = "cancelling"; + entry.controller.abort(reason); + return true; + } + + async function disposeEntry( + entry: RegistryEntry, + outcome: RequestOutcome, + ): Promise { + if (entry.scope.disposed) return; + entry.ctx.state = outcome; + entry.detachParent(); + // Pull the entry out before unwinding so observers (e.g. a `cancel(...)` + // racing with dispose) don't see a half-disposed context. + entries.delete(entry.ctx.requestId); + await entry.scope[Symbol.asyncDispose](); + } + + function begin(opts: BeginOpts): ManagedRequestContext { + if (entries.has(opts.requestId)) { + throw new RequestIdConflictError(opts.requestId); + } + + const controller = new AbortController(); + const scope = createDisposableScope(); + + let detachParent = () => {}; + if (opts.parentSignal) { + const parent = opts.parentSignal; + if (parent.aborted) { + controller.abort(parent.reason); + } else { + const onParentAbort = () => controller.abort(parent.reason); + parent.addEventListener("abort", onParentAbort, { once: true }); + detachParent = () => parent.removeEventListener("abort", onParentAbort); + } + } + + const ctx: RequestContext = { + requestId: opts.requestId, + kind: opts.kind, + modelId: opts.modelId, + signal: controller.signal, + scope, + state: "running", + }; + + const entry: RegistryEntry = { ctx, controller, scope, detachParent }; + entries.set(opts.requestId, entry); + + return { + get requestId() { + return ctx.requestId; + }, + get kind() { + return ctx.kind; + }, + get modelId() { + return ctx.modelId; + }, + get signal() { + return ctx.signal; + }, + get scope() { + return ctx.scope; + }, + get state() { + return ctx.state; + }, + set state(next: RequestState) { + ctx.state = next; + }, + [Symbol.asyncDispose]: async () => { + await disposeEntry(entry, derivedTerminalState(ctx)); + }, + }; + } + + function get(requestId: string): RequestContext | null { + return entries.get(requestId)?.ctx ?? null; + } + + function list(): RequestContext[] { + return Array.from(entries.values(), (e) => e.ctx); + } + + function cancel(target: CancelTarget): number { + let cancelled = 0; + if ("requestId" in target) { + const entry = entries.get(target.requestId); + if (entry && cancelEntry(entry, target.reason)) cancelled++; + return cancelled; + } + for (const entry of entries.values()) { + if (entry.ctx.modelId !== target.modelId) continue; + if (target.kind && entry.ctx.kind !== target.kind) continue; + if (cancelEntry(entry, target.reason)) cancelled++; + } + return cancelled; + } + + function cancelAll(reason: "shutdown" | "modelUnload"): Promise { + for (const entry of entries.values()) { + cancelEntry(entry, reason); + } + // The interface returns Promise so we can later make this an + // async sweep that awaits per-handler scope unwinding (e.g. join on + // the disposers). Today every handler unwinds on its own dispose + // path, so the function only needs to fire-and-forget the abort. + return Promise.resolve(); + } + + async function end( + requestId: string, + outcome: RequestOutcome, + ): Promise { + const entry = entries.get(requestId); + if (!entry) return; + await disposeEntry(entry, outcome); + } + + return { begin, get, list, cancel, cancelAll, end }; +} + +function derivedTerminalState(ctx: RequestContext): RequestOutcome { + if (ctx.state === "failed") return "failed"; + if (ctx.signal.aborted || ctx.state === "cancelled") return "cancelled"; + return "completed"; +} diff --git a/packages/sdk/server/rpc/handlers/cancelHandler.ts b/packages/sdk/server/rpc/handlers/cancelHandler.ts index 8daf96e962..62c7d74fb9 100644 --- a/packages/sdk/server/rpc/handlers/cancelHandler.ts +++ b/packages/sdk/server/rpc/handlers/cancelHandler.ts @@ -5,19 +5,33 @@ import { cancelRagOperation, DEFAULT_WORKSPACE, } from "@/server/bare/rag-hyperdb"; +import { getRequestRegistry } from "@/server/bare/runtime"; import { getServerLogger } from "@/logging"; const logger = getServerLogger(); -export async function cancelHandler( +export function cancelHandler( request: CancelRequest, ): Promise { try { switch (request.operation) { case "inference": + cancel({ modelId: request.modelId }, { kind: "completion" }); + break; case "embeddings": - await cancel({ modelId: request.modelId }); + cancel({ modelId: request.modelId }, { kind: "embeddings" }); break; + case "request": { + const cancelled = getRequestRegistry().cancel({ + requestId: request.requestId, + }); + if (cancelled === 0) { + logger.debug( + `[cancel] no in-flight request matched requestId=${request.requestId}`, + ); + } + break; + } case "downloadAsset": cancelTransfer(request.downloadKey, request.clearCache); break; @@ -32,16 +46,16 @@ export async function cancelHandler( } } - return { + return Promise.resolve({ type: "cancel", success: true, - }; + }); } catch (error) { logger.error("Error during cancellation:", error); - return { + return Promise.resolve({ type: "cancel", success: false, error: error instanceof Error ? error.message : "Unknown error", - }; + }); } } diff --git a/packages/sdk/test/unit/kv-cache-cancel.test.ts b/packages/sdk/test/unit/kv-cache-cancel.test.ts index 920f074470..8f221e4270 100644 --- a/packages/sdk/test/unit/kv-cache-cancel.test.ts +++ b/packages/sdk/test/unit/kv-cache-cancel.test.ts @@ -1,47 +1,36 @@ // @ts-expect-error brittle has no type declarations import test from "brittle"; import { - _resetCancelCountersForTest, cachedMessageCounts, clearCachedMessageCounts, decideCachedHistorySlice, - noteCancelRequested, - shouldRecordSavedCount, - snapshotCancelCount, type HistoryMessage, } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; // ----------------------------------------------------------------------------- -// Unit-level regression coverage for the kv-cache cancel/zero-token fix. +// Unit-level regression coverage for `decideCachedHistorySlice` — the pure +// piece of the kv-cache cancel/zero-token fix (QVAC-17780). // -// These tests cover the two pure pieces of the fix: -// 1. `decideCachedHistorySlice` never returns an empty message list when -// there is history to send (was the root cause of the "SDK returns -// nothing" symptom after a fast cancel). -// 2. `shouldRecordSavedCount` + `snapshotCancelCount` correctly refuse to -// record a `savedCount` for cancelled or zero-token turns (prevents -// the `cachedMessageCounts` map from being poisoned in the first place). -// -// Integration-level coverage (running the full `completion()` generator -// against a real model) requires a loaded model and therefore lives outside -// this unit suite. +// In SDK 0.11.0 the cancel-counter side channel that used to live in this +// module (`modelCancelCounters`, `noteCancelRequested`, `snapshotCancelCount`, +// `shouldRecordSavedCount`) was retired. Cancel detection now flows through +// the per-request `AbortSignal` from `RequestRegistry` (see +// `test/unit/runtime/request-registry.test.ts`) and `completion-stream.ts` +// reads `signal.aborted` directly. The slice-decision regression coverage +// below is still relevant — it guards the "stale savedCount → empty payload" +// failure mode that's independent of how cancel is plumbed. // ----------------------------------------------------------------------------- -const CACHE_PATH = "/tmp/kv-cache-cancel-test.bin"; +type T = { + alike: (actual: unknown, expected: unknown, msg?: string) => void; + is: (actual: unknown, expected: unknown, msg?: string) => void; +}; function resetState() { clearCachedMessageCounts(); - _resetCancelCountersForTest(); } -// ----------------------------------------------------------------------------- -// decideCachedHistorySlice -// ----------------------------------------------------------------------------- - -test("decideCachedHistorySlice: baseline slice when savedCount is valid", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: baseline slice when savedCount is valid", (t: T) => { resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, @@ -61,10 +50,7 @@ test("decideCachedHistorySlice: baseline slice when savedCount is valid", (t: { t.is(clearStaleCount, false); }); -test("decideCachedHistorySlice: stale count (slice would be empty) falls back and flags clear", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: stale count (slice would be empty) falls back and flags clear", (t: T) => { resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, @@ -87,10 +73,7 @@ test("decideCachedHistorySlice: stale count (slice would be empty) falls back an ); }); -test("decideCachedHistorySlice: savedCount > history.length falls back and flags clear", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: savedCount > history.length falls back and flags clear", (t: T) => { resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, @@ -105,10 +88,7 @@ test("decideCachedHistorySlice: savedCount > history.length falls back and flags t.is(clearStaleCount, true); }); -test("decideCachedHistorySlice: savedCount = 0, cache exists → strip system, no clear", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: savedCount = 0, cache exists → strip system, no clear", (t: T) => { resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, @@ -123,10 +103,7 @@ test("decideCachedHistorySlice: savedCount = 0, cache exists → strip system, n t.is(clearStaleCount, false); }); -test("decideCachedHistorySlice: cache does not exist → strip system regardless of savedCount", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: cache does not exist → strip system regardless of savedCount", (t: T) => { resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, @@ -145,21 +122,15 @@ test("decideCachedHistorySlice: cache does not exist → strip system regardless ); }); -test("decideCachedHistorySlice: empty history returns empty, no clear", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("decideCachedHistorySlice: empty history returns empty, no clear", (t: T) => { resetState(); const { messages, clearStaleCount } = decideCachedHistorySlice(2, true, []); t.alike(messages, []); t.is(clearStaleCount, false); }); -test("decideCachedHistorySlice: savedCount = history.length slices to [] and flags clear", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { - // Exact shape of the reported bug: a cancelled turn records +test("decideCachedHistorySlice: savedCount = history.length slices to [] and flags clear", (t: T) => { + // Exact shape of the reported QVAC-17780 bug: a cancelled turn records // `history.length + 1` for a 2-message history; the user's next turn // has 3 messages and a savedCount of 3 — slicing yields []. The // fallback must fire. @@ -181,142 +152,7 @@ test("decideCachedHistorySlice: savedCount = history.length slices to [] and fla t.is(clearStaleCount, true); }); -// ----------------------------------------------------------------------------- -// shouldRecordSavedCount -// ----------------------------------------------------------------------------- - -test("shouldRecordSavedCount: true only for normal non-cancelled turns with tokens", (t: { - is: (actual: unknown, expected: unknown, msg?: string) => void; -}) => { - t.is( - shouldRecordSavedCount(false, true), - true, - "record on a normal, token-emitting turn", - ); - t.is( - shouldRecordSavedCount(true, true), - false, - "never record when cancelled, even if tokens flowed", - ); - t.is( - shouldRecordSavedCount(false, false), - false, - "never record a zero-token turn — cache state is unknown", - ); - t.is( - shouldRecordSavedCount(true, false), - false, - "cancelled + zero tokens → definitely do not record", - ); -}); - -// ----------------------------------------------------------------------------- -// cancel counter semantics -// ----------------------------------------------------------------------------- - -test("noteCancelRequested: per-model counters are isolated and monotonic", (t: { - is: (actual: unknown, expected: unknown) => void; -}) => { - resetState(); - const a = "model-A"; - const b = "model-B"; - - t.is(snapshotCancelCount(a), 0); - t.is(snapshotCancelCount(b), 0); - - noteCancelRequested(a); - t.is(snapshotCancelCount(a), 1); - t.is(snapshotCancelCount(b), 0, "cancels on A don't leak into B"); - - noteCancelRequested(a); - noteCancelRequested(b); - t.is(snapshotCancelCount(a), 2); - t.is(snapshotCancelCount(b), 1); -}); - -test("snapshot-before-snapshot-after detects a cancel recorded in between", (t: { - is: (actual: unknown, expected: unknown) => void; -}) => { - // This is the exact pattern used in `completion()` to detect cancellation: - // const before = snapshotCancelCount(modelId); - // ... run model ... - // const wasCancelled = snapshotCancelCount(modelId) > before; - resetState(); - const id = "model-X"; - const before = snapshotCancelCount(id); - noteCancelRequested(id); - const wasCancelled = snapshotCancelCount(id) > before; - t.is(wasCancelled, true); -}); - -test("snapshot-before-snapshot-after sees no cancel when none was recorded", (t: { - is: (actual: unknown, expected: unknown) => void; -}) => { - resetState(); - const id = "model-Y"; - // Simulate a prior cancel from earlier in the session. - noteCancelRequested(id); - const before = snapshotCancelCount(id); - // No cancel during this turn. - const wasCancelled = snapshotCancelCount(id) > before; - t.is( - wasCancelled, - false, - "old cancels do not re-fire on subsequent turns because we compare snapshots", - ); -}); - -// ----------------------------------------------------------------------------- -// End-to-end sequence simulated at the state layer: verifies what -// `completion()` would see across a cancelled turn followed by a fresh -// prompt. -// ----------------------------------------------------------------------------- - -test("cancelled turn → next turn still has a non-empty payload", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { - resetState(); - const id = "model-repro"; - const cachePath = "/tmp/qvac-17780.bin"; - - // Turn 1: user sends one message, cancels mid-decode. - const turn1History: HistoryMessage[] = [ - { role: "system", content: "sys" }, - { role: "user", content: "u1" }, - ]; - const t1Before = snapshotCancelCount(id); - noteCancelRequested(id); // cancel fires while generating - const t1Cancelled = snapshotCancelCount(id) > t1Before; - // completion() would refuse to record savedCount here. - t.is(shouldRecordSavedCount(t1Cancelled, /* producedTokens */ true), false); - // Make sure the map stays clean. - t.is(cachedMessageCounts.has(cachePath), false); - - // Turn 2: user types a second message immediately. History grows to 3. - // Because turn 1 did not record a savedCount, the slice decision is the - // "strip system, send rest" branch — NOT an empty payload. - const turn2History: HistoryMessage[] = [ - { role: "system", content: "sys" }, - { role: "user", content: "u1" }, - { role: "user", content: "u2" }, - ]; - const savedCount = cachedMessageCounts.get(cachePath) ?? 0; - const { messages } = decideCachedHistorySlice(savedCount, true, turn2History); - t.alike( - messages, - [ - { role: "user", content: "u1" }, - { role: "user", content: "u2" }, - ], - "turn 2 must carry real content, not an empty payload", - ); -}); - -test("regression: an externally-seeded stale savedCount still triggers the fallback", (t: { - alike: (actual: unknown, expected: unknown) => void; - is: (actual: unknown, expected: unknown) => void; -}) => { +test("regression: an externally-seeded stale savedCount still triggers the fallback", (t: T) => { // Belt-and-suspenders test: simulate an externally-poisoned savedCount // (e.g. from a pre-upgrade SDK instance still running in memory) and // confirm that `decideCachedHistorySlice` refuses to emit an empty diff --git a/packages/sdk/test/unit/runtime/disposable-scope.test.ts b/packages/sdk/test/unit/runtime/disposable-scope.test.ts new file mode 100644 index 0000000000..a252f0ca05 --- /dev/null +++ b/packages/sdk/test/unit/runtime/disposable-scope.test.ts @@ -0,0 +1,153 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { createDisposableScope } from "@/server/bare/runtime/disposable-scope"; + +// ----------------------------------------------------------------------------- +// DisposableScope unit tests. +// +// Covers the four contracts handlers will rely on: +// 1. LIFO cleanup ordering (mirrors how try/finally would unwind a +// sequential write-up of the same handler). +// 2. Idempotent dispose — a scope that has already unwound is a no-op +// on subsequent dispose calls. +// 3. Error aggregation — every cleanup runs even when an earlier one +// throws; multiple failures arrive as a single AggregateError. +// 4. Late-defer behaviour — calling defer after dispose runs the +// cleanup eagerly so resources don't leak silently. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + alike: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; + exception: ( + fn: () => Promise | unknown, + matcher?: unknown, + msg?: string, + ) => Promise; + pass: (msg?: string) => void; + fail: (msg?: string) => void; +}; + +test("disposable-scope: cleanups run in LIFO order", async (t: T) => { + const order: string[] = []; + const scope = createDisposableScope(); + scope.defer(() => { + order.push("first-registered"); + }); + scope.defer(() => { + order.push("second-registered"); + }); + scope.defer(async () => { + await Promise.resolve(); + order.push("third-registered"); + }); + + await scope[Symbol.asyncDispose](); + t.alike(order, ["third-registered", "second-registered", "first-registered"]); + t.is(scope.disposed, true); +}); + +test("disposable-scope: dispose is idempotent", async (t: T) => { + let runs = 0; + const scope = createDisposableScope(); + scope.defer(() => { + runs++; + }); + await scope[Symbol.asyncDispose](); + await scope[Symbol.asyncDispose](); + await scope[Symbol.asyncDispose](); + t.is(runs, 1, "deferred cleanup runs exactly once"); + t.is(scope.disposed, true); +}); + +test("disposable-scope: a single failing cleanup rethrows verbatim", async (t: T) => { + const scope = createDisposableScope(); + const boom = new Error("boom"); + scope.defer(() => { + throw boom; + }); + await t.exception(async () => { + await scope[Symbol.asyncDispose](); + }); +}); + +test("disposable-scope: multiple failures are collected into AggregateError", async (t: T) => { + const scope = createDisposableScope(); + let third = 0; + scope.defer(() => { + throw new Error("first"); + }); + scope.defer(() => { + third++; + }); + scope.defer(async () => { + await Promise.resolve(); + throw new Error("second"); + }); + + let captured: unknown = null; + try { + await scope[Symbol.asyncDispose](); + } catch (err) { + captured = err; + } + + t.ok(captured instanceof AggregateError, "throws AggregateError"); + const agg = captured as AggregateError; + t.is(agg.errors.length, 2, "two underlying errors"); + t.is(third, 1, "non-throwing cleanup still runs"); +}); + +test("disposable-scope: every cleanup runs even when one throws midway", async (t: T) => { + const scope = createDisposableScope(); + let aRan = 0; + let cRan = 0; + scope.defer(() => { + aRan++; + }); + scope.defer(() => { + throw new Error("middle"); + }); + scope.defer(() => { + cRan++; + }); + try { + await scope[Symbol.asyncDispose](); + } catch { + // expected + } + t.is(aRan, 1, "earlier-registered cleanup ran despite later throw"); + t.is(cRan, 1, "later-registered cleanup ran first (LIFO) and unaffected"); +}); + +test("disposable-scope: late defer runs the cleanup eagerly", async (t: T) => { + const scope = createDisposableScope(); + await scope[Symbol.asyncDispose](); + t.is(scope.disposed, true); + + let lateRan = 0; + scope.defer(() => { + lateRan++; + }); + // Eager run is synchronous-or-microtask; yielding once is enough. + await Promise.resolve(); + await Promise.resolve(); + t.is(lateRan, 1, "late-registered cleanup ran without leaking"); +}); + +test("disposable-scope: works with `await using` syntax", async (t: T) => { + const seen: string[] = []; + async function run() { + await using scope = createDisposableScope(); + scope.defer(() => { + seen.push("a"); + }); + scope.defer(() => { + seen.push("b"); + }); + seen.push("body"); + } + await run(); + t.alike(seen, ["body", "b", "a"], "await using disposes in LIFO order"); +}); diff --git a/packages/sdk/test/unit/runtime/request-registry.test.ts b/packages/sdk/test/unit/runtime/request-registry.test.ts new file mode 100644 index 0000000000..9655ba58cc --- /dev/null +++ b/packages/sdk/test/unit/runtime/request-registry.test.ts @@ -0,0 +1,263 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { createRequestRegistry } from "@/server/bare/runtime/request-registry"; +import { RequestIdConflictError } from "@/utils/errors-server"; + +// ----------------------------------------------------------------------------- +// RequestRegistry unit tests. +// +// Covers the contract M1 hands to handler authors: +// - begin / get / list reflect a coherent in-flight set. +// - cancel-by-requestId targets exactly one entry. +// - cancel-by-modelId predicate fans out across entries with optional +// kind narrowing. +// - cancelAll fires every active request's signal exactly once. +// - Disposing the managed context (via `await using`) flips the state +// and removes the registry slot. +// - parentSignal compositions abort the request when the parent does. +// - RequestIdConflictError is thrown on duplicate ids. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + alike: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; + exception: ( + fn: () => Promise | unknown, + matcher?: unknown, + msg?: string, + ) => Promise; +}; + +test("registry: begin/get/list track in-flight requests", async (t: T) => { + const r = createRequestRegistry(); + await using a = r.begin({ + requestId: "r-a", + kind: "completion", + modelId: "m1", + }); + await using b = r.begin({ + requestId: "r-b", + kind: "embeddings", + modelId: "m2", + }); + + t.is(r.get("r-a")?.requestId, "r-a"); + t.is(r.get("r-b")?.requestId, "r-b"); + t.is(r.get("missing"), null); + t.is(r.list().length, 2); + + // touch the variables so noUnusedLocals stays quiet. + t.is(a.kind, "completion"); + t.is(b.kind, "embeddings"); +}); + +test("registry: dispose removes the slot and flips state", async (t: T) => { + const r = createRequestRegistry(); + + async function run() { + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is(ctx.state, "running"); + t.is(r.list().length, 1); + } + + await run(); + t.is(r.list().length, 0, "scope unwind removed the registry slot"); + t.is(r.get("r-1"), null); +}); + +test("registry: cancel by requestId aborts only that signal", async (t: T) => { + const r = createRequestRegistry(); + await using a = r.begin({ + requestId: "r-a", + kind: "completion", + modelId: "m1", + }); + await using b = r.begin({ + requestId: "r-b", + kind: "completion", + modelId: "m1", + }); + + const cancelled = r.cancel({ requestId: "r-a" }); + t.is(cancelled, 1, "exactly one entry cancelled"); + t.is(a.signal.aborted, true); + t.is(a.state, "cancelling"); + t.is(b.signal.aborted, false, "sibling on the same model is untouched"); + t.is(b.state, "running"); +}); + +test("registry: cancel-by-requestId is idempotent and counts only first abort", async (t: T) => { + const r = createRequestRegistry(); + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is(r.cancel({ requestId: "r-1" }), 1); + t.is(r.cancel({ requestId: "r-1" }), 0, "second cancel returns 0"); + t.is(ctx.signal.aborted, true); +}); + +test("registry: cancel by modelId fans out across that model only", async (t: T) => { + const r = createRequestRegistry(); + await using a = r.begin({ + requestId: "r-a", + kind: "completion", + modelId: "m1", + }); + await using b = r.begin({ + requestId: "r-b", + kind: "embeddings", + modelId: "m1", + }); + await using c = r.begin({ + requestId: "r-c", + kind: "completion", + modelId: "m2", + }); + + const cancelled = r.cancel({ modelId: "m1" }); + t.is(cancelled, 2, "both m1 entries cancelled"); + t.is(a.signal.aborted, true); + t.is(b.signal.aborted, true); + t.is(c.signal.aborted, false); +}); + +test("registry: cancel by modelId + kind narrows the target", async (t: T) => { + const r = createRequestRegistry(); + await using a = r.begin({ + requestId: "r-a", + kind: "completion", + modelId: "m1", + }); + await using b = r.begin({ + requestId: "r-b", + kind: "embeddings", + modelId: "m1", + }); + + const cancelled = r.cancel({ modelId: "m1", kind: "completion" }); + t.is(cancelled, 1, "only the completion-kind entry cancelled"); + t.is(a.signal.aborted, true); + t.is(b.signal.aborted, false); +}); + +test("registry: cancelAll fires every signal", async (t: T) => { + const r = createRequestRegistry(); + await using a = r.begin({ + requestId: "r-a", + kind: "completion", + modelId: "m1", + }); + await using b = r.begin({ + requestId: "r-b", + kind: "loadModel", + modelId: "m2", + }); + await using c = r.begin({ + requestId: "r-c", + kind: "rag", + }); + + await r.cancelAll("shutdown"); + t.is(a.signal.aborted, true); + t.is(b.signal.aborted, true); + t.is(c.signal.aborted, true); +}); + +test("registry: parentSignal already aborted aborts the new context", async (t: T) => { + const r = createRequestRegistry(); + const parent = new AbortController(); + parent.abort("shutdown"); + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + parentSignal: parent.signal, + }); + t.is(ctx.signal.aborted, true); +}); + +test("registry: parentSignal aborts propagate to children", async (t: T) => { + const r = createRequestRegistry(); + const parent = new AbortController(); + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + parentSignal: parent.signal, + }); + t.is(ctx.signal.aborted, false); + parent.abort("shutdown"); + t.is(ctx.signal.aborted, true); +}); + +test("registry: duplicate requestId throws RequestIdConflictError", async (t: T) => { + const r = createRequestRegistry(); + await using first = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is(first.kind, "completion"); + await t.exception(() => { + r.begin({ requestId: "r-1", kind: "completion", modelId: "m1" }); + }, RequestIdConflictError); +}); + +test("registry: end(requestId) sets state, disposes scope, and removes slot", async (t: T) => { + const r = createRequestRegistry(); + let cleanupRan = 0; + const ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + ctx.scope.defer(() => { + cleanupRan++; + }); + + await r.end("r-1", "completed"); + t.is(cleanupRan, 1, "scope unwound"); + t.is(ctx.state, "completed"); + t.is(r.get("r-1"), null); +}); + +test("registry: end without prior begin is a no-op", async (t: T) => { + const r = createRequestRegistry(); + await r.end("does-not-exist", "completed"); + // no throw, no entries + t.is(r.list().length, 0); +}); + +test("registry: derived terminal state is 'cancelled' if signal aborted, 'completed' otherwise", async (t: T) => { + const r = createRequestRegistry(); + + async function cancelledRun() { + await using ctx = r.begin({ + requestId: "r-cancelled", + kind: "completion", + modelId: "m1", + }); + r.cancel({ requestId: "r-cancelled" }); + return ctx; + } + const cancelled = await cancelledRun(); + t.is(cancelled.state, "cancelled"); + + async function happyRun() { + await using ctx = r.begin({ + requestId: "r-happy", + kind: "completion", + modelId: "m1", + }); + return ctx; + } + const happy = await happyRun(); + t.is(happy.state, "completed"); +}); diff --git a/packages/sdk/types/bare-crypto/index.d.ts b/packages/sdk/types/bare-crypto/index.d.ts index 0508b8a8e7..2c29833522 100644 --- a/packages/sdk/types/bare-crypto/index.d.ts +++ b/packages/sdk/types/bare-crypto/index.d.ts @@ -8,6 +8,7 @@ declare module "bare-crypto" { export function createHash(algorithm: string): Hash; export function randomBytes(size: number): Buffer; + export function randomUUID(): string; export function createCipher( algorithm: string, password: string | Buffer, diff --git a/packages/sdk/utils/errors-server.ts b/packages/sdk/utils/errors-server.ts index 29229bb6c6..2608c2cd90 100644 --- a/packages/sdk/utils/errors-server.ts +++ b/packages/sdk/utils/errors-server.ts @@ -279,6 +279,30 @@ export class CancelFailedError extends QvacErrorBase { } } +export class RequestIdConflictError extends QvacErrorBase { + constructor(requestId: string, cause?: unknown) { + super( + createErrorOptions( + SDK_SERVER_ERROR_CODES.REQUEST_ID_CONFLICT, + [requestId], + cause, + ), + ); + } +} + +export class RequestNotFoundError extends QvacErrorBase { + constructor(requestId: string, cause?: unknown) { + super( + createErrorOptions( + SDK_SERVER_ERROR_CODES.REQUEST_NOT_FOUND, + [requestId], + cause, + ), + ); + } +} + export class TextToSpeechFailedError extends QvacErrorBase { constructor(details?: string, cause?: unknown) { super( From 513bb9e487bc21a3612948c66d6a52f8994a6c9b Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Mon, 11 May 2026 12:18:44 +0200 Subject: [PATCH 2/6] QVAC-18181 fix: preserve addon-level cancel for non-migrated handlers and handle pre-aborted signals Three small follow-ups based on PR review: 1. server/bare/ops/cancel.ts now falls back to `model.addon.cancel()` when the registry has no match. Only the llama.cpp completion handler is wired through `registry.begin(...)` in 0.11.0; embeddings / transcription / translation / decoder / OCR / TTS handlers will follow in later milestones. Without this fallback, `cancel({ operation: "embeddings", modelId })` and the other non-migrated surfaces would become silent no-ops post-PR. 2. completion-stream.ts re-invokes `onAbort` synchronously when `signal.aborted` is already true at register time. `addEventListener("abort", ..., { once: true })` does not fire for an already-aborted signal, and the registry can hand the completion handler a born-aborted signal when `parentSignal` was already aborted at `begin(...)`. 3. Soften docstring claims about cancel-before-roundtrip in client/api/cancel.ts, examples/cancel-by-request-id.ts, schemas/completion-stream.ts, schemas/completion-event.ts, and server/bare/runtime/request-id.ts. The `requestId` is surfaced synchronously, but a `cancel({ requestId })` issued in the same tick may race the begin and is logged as a no-match. --- packages/sdk/client/api/cancel.ts | 3 ++ packages/sdk/examples/cancel-by-request-id.ts | 8 +++-- packages/sdk/schemas/completion-event.ts | 11 +++--- packages/sdk/schemas/completion-stream.ts | 2 +- packages/sdk/server/bare/ops/cancel.ts | 35 ++++++++++++++----- .../ops/completion-stream.ts | 8 +++++ .../sdk/server/bare/runtime/request-id.ts | 8 ++--- 7 files changed, 55 insertions(+), 20 deletions(-) diff --git a/packages/sdk/client/api/cancel.ts b/packages/sdk/client/api/cancel.ts index 5e86d31c45..9a3e4bf8b3 100644 --- a/packages/sdk/client/api/cancel.ts +++ b/packages/sdk/client/api/cancel.ts @@ -16,6 +16,9 @@ import { InvalidResponseError, CancelFailedError } from "@/utils/errors-client"; * `(await completion({ ... })).requestId`) to cancel exactly that * request. Either pass `{ requestId }` directly or the explicit * `{ operation: "request", requestId }` form; both are equivalent. + * The cancel takes effect once the server has begun the request; a + * cancel that races the originating call to the worker may arrive + * before the request is registered and is logged as a no-match. * - **By `modelId`** (broad-cancel escape hatch, kept indefinitely) — * `{ operation: "inference" | "embeddings", modelId }` cancels every * in-flight request running on that model. Useful for model unload, diff --git a/packages/sdk/examples/cancel-by-request-id.ts b/packages/sdk/examples/cancel-by-request-id.ts index 507d75e86c..b7c3deb662 100644 --- a/packages/sdk/examples/cancel-by-request-id.ts +++ b/packages/sdk/examples/cancel-by-request-id.ts @@ -9,9 +9,11 @@ * Two cancel paths exist: * * 1. `cancel({ requestId })` — targeted cancel, the primary path - * introduced in 0.11.0. Available as soon as `completion()` returns, - * before the first network round-trip — so a user clicking "stop" - * during the queueing phase still hits the right run. + * introduced in 0.11.0. The `requestId` is available synchronously + * on the `CompletionRun`, but the cancel only takes effect once the + * server has begun the request; a cancel issued in the same tick + * as `completion()` may arrive at the worker before the request is + * registered and is logged as a no-match. * 2. `cancel({ operation: "inference", modelId })` — broad cancel * (escape hatch, kept indefinitely). Cancels every inference running * on the model. Useful for unload, app shutdown, admin sweeps when diff --git a/packages/sdk/schemas/completion-event.ts b/packages/sdk/schemas/completion-event.ts index 21b7e09cf7..0e34d84387 100644 --- a/packages/sdk/schemas/completion-event.ts +++ b/packages/sdk/schemas/completion-event.ts @@ -135,10 +135,13 @@ export type CompletionFinal = { export type CompletionRun = { /** * Stable identifier for this run, generated client-side at call time - * (UUIDv4) and available synchronously the moment `completion(...)` - * returns — before the first network round-trip. Pass it to - * `cancel({ requestId })` to target this specific request without - * affecting any other inference running on the same model. + * (UUIDv4 when `crypto.randomUUID` is available, otherwise an opaque + * random hex token) and available synchronously the moment + * `completion(...)` returns. Pass it to `cancel({ requestId })` to + * target this specific request without affecting any other inference + * running on the same model. The cancel only takes effect once the + * server has begun the request, so a cancel issued in the same tick + * as `completion(...)` may race the begin and is logged as a no-match. */ requestId: string; /** Ordered stream of typed completion events — the canonical consumption API. */ diff --git a/packages/sdk/schemas/completion-stream.ts b/packages/sdk/schemas/completion-stream.ts index 72839c1999..e3edc9f1a1 100644 --- a/packages/sdk/schemas/completion-stream.ts +++ b/packages/sdk/schemas/completion-stream.ts @@ -180,7 +180,7 @@ const completionClientParamsBaseSchema = completionParamsSchema.extend({ .min(1) .optional() .describe( - "Stable identifier for this in-flight request, generated by the client at call time (UUIDv4). Surfaced on the `CompletionRun` result so callers can target it with `cancel({ requestId })`. Optional on the wire so legacy clients keep working — the server falls back to a server-generated id when the field is missing.", + "Stable identifier for this in-flight request, generated by the client at call time (opaque token, UUIDv4 when available). Surfaced on the `CompletionRun` result so callers can target it with `cancel({ requestId })`. Optional on the wire so legacy clients keep working — the server falls back to a server-generated id when the field is missing. Note: cancel-by-requestId only takes effect once the server has begun the request, so a cancel issued in the same tick as `completion()` may arrive before the request is registered.", ), }); diff --git a/packages/sdk/server/bare/ops/cancel.ts b/packages/sdk/server/bare/ops/cancel.ts index 6cd7fe61c9..96e9c3f94f 100644 --- a/packages/sdk/server/bare/ops/cancel.ts +++ b/packages/sdk/server/bare/ops/cancel.ts @@ -20,6 +20,14 @@ const logger = getServerLogger(); * path: the caller may not have a `requestId` to hand (model unload, * app shutdown, admin sweeps), and the escape hatch is cheap because * the registry already does the matching. + * + * Compatibility fallback: only the llama.cpp completion handler routes + * through the registry in 0.11.0; embeddings / transcription / + * translation / decoder / OCR / TTS handlers will follow in later + * milestones. Until then, a `modelId`-targeted cancel that finds zero + * registry matches falls back to the pre-0.11.0 behavior of calling + * `model.addon.cancel()` directly, so the wire contract for those + * surfaces does not regress while the migration is in flight. */ export function cancel( params: CancelInferenceBaseParams, @@ -33,17 +41,28 @@ export function cancel( } const registry = getRequestRegistry(); - const target = opts?.kind - ? { modelId, kind: opts.kind } - : { modelId }; + const target = opts?.kind ? { modelId, kind: opts.kind } : { modelId }; const cancelled = registry.cancel(target); - // No active request to cancel is not a hard error — callers (workbench - // "Stop" button, app shutdown sweeps) often fire-and-forget. Log so - // operators can see when a cancel landed against an empty registry. - if (cancelled === 0) { + if (cancelled > 0) return; + + // No registry match: a request kind whose handler hasn't been migrated + // onto `registry.begin(...)` yet (everything except llama.cpp + // completion in 0.11.0). Fire the addon-level cancel directly so the + // pre-registry behavior is preserved. + const addon = model.addon; + if (addon?.cancel) { + void addon.cancel.call(addon); logger.debug( - `[cancel] no in-flight request matched modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""}`, + `[cancel] no registry match for modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""} — fell back to addon.cancel()`, ); + return; } + + // Callers (workbench "Stop" button, app shutdown sweeps) often + // fire-and-forget; log so operators can see when a cancel landed + // against a registry with nothing in flight and no addon-level cancel. + logger.debug( + `[cancel] no in-flight request matched modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""}`, + ); } diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index bbb1bd33cc..aea284eabc 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -504,6 +504,14 @@ export async function* completion( } }; signal.addEventListener("abort", onAbort, { once: true }); + // `addEventListener("abort", ..., { once: true })` does *not* fire if + // the signal is already aborted at register time — but the registry + // synchronously aborts a fresh controller when `parentSignal` was + // already aborted at `begin(...)`. Without this fall-through, the + // addon would keep decoding until `shouldRecordSavedCount` notices + // post-loop. Re-using `onAbort` here keeps the listener body as the + // single source of truth for "what cancel does." + if (signal.aborted) onAbort(); if (kvCache) { const systemPromptFromHistory = extractSystemPrompt(history); diff --git a/packages/sdk/server/bare/runtime/request-id.ts b/packages/sdk/server/bare/runtime/request-id.ts index fb38de732f..5de6bd3701 100644 --- a/packages/sdk/server/bare/runtime/request-id.ts +++ b/packages/sdk/server/bare/runtime/request-id.ts @@ -2,10 +2,10 @@ import { randomUUID } from "bare-crypto"; /** * Server-side fallback for `requestId`. The new wire contract is that - * the client generates the id (UUIDv4) at call time so it's available - * synchronously before the first network round-trip. To keep older - * clients working, the request schema marks `requestId` optional and the - * server fills it in here when it's missing. + * the client generates the id (UUIDv4) at call time so it's surfaced + * synchronously on the `CompletionRun` for use with `cancel({ requestId })`. + * To keep older clients working, the request schema marks `requestId` + * optional and the server fills it in here when it's missing. * * `bare-crypto.randomUUID()` mirrors Node's `crypto.randomUUID()` and is * Bare-runtime safe. Returns a v4 UUID. From c187abe62632299314bcf92c0e3f6233858bf41d Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Mon, 11 May 2026 13:38:02 +0200 Subject: [PATCH 3/6] QVAC-18181 fix: import AbortController from bare-abort-controller Bare runtime does not expose AbortController as a global, so the registry's `new AbortController()` call in `begin(...)` failed at runtime on every completion (`ReferenceError: AbortController is not defined`). Use the existing `bare-abort-controller` dep (already used by download-manager and rag-operation-manager) and add matching `AbortSignal` type imports to the two other new/touched files for consistency with the rest of the Bare-server codebase. This was missed because: - Unit tests run in Bun where AbortController is global. - TypeScript typechecks against lib.dom where AbortController is global. - tests-qvac (the only CI surface that loads into actual Bare) is skipping on this PR, so the integration path that exercises `registry.begin(...)` was never run. --- .../bare/plugins/llamacpp-completion/ops/completion-stream.ts | 1 + packages/sdk/server/bare/runtime/request-context.ts | 1 + packages/sdk/server/bare/runtime/request-registry.ts | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index aea284eabc..4c1573ae58 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -1,3 +1,4 @@ +import type { AbortSignal } from "bare-abort-controller"; import type { RunOptions } from "@qvac/llm-llamacpp"; import type { CompletionParams, diff --git a/packages/sdk/server/bare/runtime/request-context.ts b/packages/sdk/server/bare/runtime/request-context.ts index cde61b45f4..d0871239bf 100644 --- a/packages/sdk/server/bare/runtime/request-context.ts +++ b/packages/sdk/server/bare/runtime/request-context.ts @@ -1,3 +1,4 @@ +import type { AbortSignal } from "bare-abort-controller"; import type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; /** diff --git a/packages/sdk/server/bare/runtime/request-registry.ts b/packages/sdk/server/bare/runtime/request-registry.ts index f4329b9adb..58e374ed63 100644 --- a/packages/sdk/server/bare/runtime/request-registry.ts +++ b/packages/sdk/server/bare/runtime/request-registry.ts @@ -1,3 +1,7 @@ +import { + AbortController, + type AbortSignal, +} from "bare-abort-controller"; import { createDisposableScope, type DisposableScope, From a120c96af0aa7c2843a85ba8c7a0e78cdade3e9b Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Mon, 11 May 2026 21:29:26 +0200 Subject: [PATCH 4/6] QVAC-18181 fix: guard against missing Symbol.asyncDispose at module load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The request-lifecycle stack depends on Symbol.asyncDispose for every `await using ctx = registry.begin(...)` site. If the host runtime doesn't expose it (older Bare/Expo build, missing polyfill), the `[Symbol.asyncDispose]:` property key in disposable-scope.ts silently coerces to the string "undefined" and produces objects that look async-disposable but are not — handlers would leak registry entries forever. Add an AsyncDisposeUnavailableError (code 53503) and a module-load guard in disposable-scope.ts that throws clearly at import time if the symbol is missing. Adds a tripwire unit test asserting `typeof Symbol.asyncDispose === "symbol"` so any regression in the Bun test runtime or future polyfill story fails the suite first. Follow-up to Yury's pitch review (Bare/Expo runtime support). --- packages/sdk/schemas/sdk-errors-server.ts | 6 ++++++ .../sdk/server/bare/runtime/disposable-scope.ts | 17 +++++++++++++++++ .../test/unit/runtime/disposable-scope.test.ts | 12 ++++++++++++ packages/sdk/utils/errors-server.ts | 12 ++++++++++++ 4 files changed, 47 insertions(+) diff --git a/packages/sdk/schemas/sdk-errors-server.ts b/packages/sdk/schemas/sdk-errors-server.ts index f39d1edd5f..ff9d905899 100644 --- a/packages/sdk/schemas/sdk-errors-server.ts +++ b/packages/sdk/schemas/sdk-errors-server.ts @@ -87,6 +87,7 @@ export const SDK_SERVER_ERROR_CODES = { FFMPEG_NOT_AVAILABLE: 53500, AUDIO_PLAYER_FAILED: 53501, INVALID_AUDIO_CHUNK_TYPE: 53502, + ASYNC_DISPOSE_UNAVAILABLE: 53503, // RPC/Delegation (Server-side) (53,700-53,849) DELEGATE_NO_FINAL_RESPONSE: 53700, @@ -474,6 +475,11 @@ const serverErrorDefinitions: ErrorCodesMap = { name: "INVALID_AUDIO_CHUNK_TYPE", message: "Invalid audio chunk type", }, + [SDK_SERVER_ERROR_CODES.ASYNC_DISPOSE_UNAVAILABLE]: { + name: "ASYNC_DISPOSE_UNAVAILABLE", + message: + "Host runtime does not expose Symbol.asyncDispose; the SDK request-lifecycle primitives require ES2024 `using`/`asyncDispose` support. Verify your runtime (Bare/Expo/Node ≥ 20.4) and any polyfill registration.", + }, // RPC/Delegation (Server-side) (53,700-53,899) [SDK_SERVER_ERROR_CODES.DELEGATE_NO_FINAL_RESPONSE]: { diff --git a/packages/sdk/server/bare/runtime/disposable-scope.ts b/packages/sdk/server/bare/runtime/disposable-scope.ts index 3d6ee9f6c7..bc2bcbe08e 100644 --- a/packages/sdk/server/bare/runtime/disposable-scope.ts +++ b/packages/sdk/server/bare/runtime/disposable-scope.ts @@ -1,3 +1,20 @@ +import { AsyncDisposeUnavailableError } from "@/utils/errors-server"; + +/** + * Module-load guard. The whole request-lifecycle primitive stack (scopes, + * the registry's `ManagedRequestContext`, `await using ctx = ...`) is + * built on `Symbol.asyncDispose`, which is an ES2024 feature. If the host + * runtime doesn't expose it (older Bare/Expo build, missing polyfill), + * the `[Symbol.asyncDispose]:` property key in this file's `dispose` + * function would coerce to the string `"undefined"` and silently produce + * objects that look async-disposable but are not — handlers would leak + * registry entries forever. Better to fail loudly at import time with a + * clear error than to debug a slow registry leak in production. + */ +if (typeof Symbol.asyncDispose !== "symbol") { + throw new AsyncDisposeUnavailableError(); +} + /** * Bounded-lifetime cleanup scope for an in-flight request. * diff --git a/packages/sdk/test/unit/runtime/disposable-scope.test.ts b/packages/sdk/test/unit/runtime/disposable-scope.test.ts index a252f0ca05..b4f4e41476 100644 --- a/packages/sdk/test/unit/runtime/disposable-scope.test.ts +++ b/packages/sdk/test/unit/runtime/disposable-scope.test.ts @@ -29,6 +29,18 @@ type T = { fail: (msg?: string) => void; }; +test("disposable-scope: host runtime exposes Symbol.asyncDispose", (t: T) => { + // Tripwire for the module-load guard in disposable-scope.ts. If a future + // runtime upgrade strips Symbol.asyncDispose (older Bare/Expo, missing + // polyfill), the guard throws at SDK import time and this test fails first. + // The guard converts a silent registry-leak bug into a loud startup error. + t.is( + typeof Symbol.asyncDispose, + "symbol", + "Symbol.asyncDispose must be a symbol; the SDK request-lifecycle stack depends on it", + ); +}); + test("disposable-scope: cleanups run in LIFO order", async (t: T) => { const order: string[] = []; const scope = createDisposableScope(); diff --git a/packages/sdk/utils/errors-server.ts b/packages/sdk/utils/errors-server.ts index 2608c2cd90..ed85f8b287 100644 --- a/packages/sdk/utils/errors-server.ts +++ b/packages/sdk/utils/errors-server.ts @@ -303,6 +303,18 @@ export class RequestNotFoundError extends QvacErrorBase { } } +export class AsyncDisposeUnavailableError extends QvacErrorBase { + constructor(cause?: unknown) { + super( + createErrorOptions( + SDK_SERVER_ERROR_CODES.ASYNC_DISPOSE_UNAVAILABLE, + [], + cause, + ), + ); + } +} + export class TextToSpeechFailedError extends QvacErrorBase { constructor(details?: string, cause?: unknown) { super( From 709008e45644d620fa9795d9f2bb8478f1aaef46 Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Tue, 12 May 2026 15:39:45 +0200 Subject: [PATCH 5/6] QVAC-18181 fix: restore addon-cancel await in fallback + detach abort listener + tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Yury's PR #1949 review comments: - `server/bare/ops/cancel.ts`: make `cancel()` async again and `await addon.cancel.call(addon)` in the M1 compat fallback. Dropping the await turned the cancel RPC for embeddings / transcription / translation / decoder / OCR / TTS into a fire-and-forget — the RPC resolved before the addon had acknowledged the cancel, breaking the wire contract 513bb9e promised the fallback would preserve until M3b/M3c migrate those handlers. - `server/rpc/handlers/cancelHandler.ts`: pair-fix — `await cancel(...)` in both `case "inference"` and `case "embeddings"` so the await actually propagates to the RPC response. Function is async now and returns the response object directly. - `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts`: wrap the body in `try { ... } finally { signal.removeEventListener(...) }` so the abort listener is detached on every exit path (happy completion, thrown error, generator `return()`). `{ once: true }` already removes the listener if the signal fires; the finally is the cleanup hook for the signal-never-fired path — mirrors the registry's own `detachParent` discipline. - `test/unit/runtime/request-registry.test.ts`: two new tests filling the gaps Yury called out: - `end() detaches parent listener` — verifies via add/remove counters that a long-lived `parentSignal` doesn't accumulate listeners across many begin/end cycles. Pins the contract `detachParent` exists to enforce. - `same-tick cancel-before-begin` — pins the M1 contract that a `cancel({ requestId })` issued before its `begin(...)` lands on the server returns 0 and does not retroactively abort the later `begin(...)`. Documents the Stop-button race that M2's typed cancel outcomes will close. Lint, typecheck, and all 694 unit tests pass. --- packages/sdk/server/bare/ops/cancel.ts | 11 +- .../ops/completion-stream.ts | 419 +++++++++--------- .../sdk/server/rpc/handlers/cancelHandler.ts | 20 +- .../unit/runtime/request-registry.test.ts | 70 +++ 4 files changed, 304 insertions(+), 216 deletions(-) diff --git a/packages/sdk/server/bare/ops/cancel.ts b/packages/sdk/server/bare/ops/cancel.ts index 96e9c3f94f..5194478c97 100644 --- a/packages/sdk/server/bare/ops/cancel.ts +++ b/packages/sdk/server/bare/ops/cancel.ts @@ -29,10 +29,10 @@ const logger = getServerLogger(); * `model.addon.cancel()` directly, so the wire contract for those * surfaces does not regress while the migration is in flight. */ -export function cancel( +export async function cancel( params: CancelInferenceBaseParams, opts?: { kind?: RequestKind }, -) { +): Promise { const { modelId } = cancelInferenceBaseSchema.parse(params); const model = getModel(modelId); @@ -49,10 +49,13 @@ export function cancel( // No registry match: a request kind whose handler hasn't been migrated // onto `registry.begin(...)` yet (everything except llama.cpp // completion in 0.11.0). Fire the addon-level cancel directly so the - // pre-registry behavior is preserved. + // pre-registry behavior is preserved — including awaiting acknowledgement, + // which is the wire contract callers relied on before this PR (the RPC + // response resolves once the addon has flipped its cancel flag, not + // beforehand). const addon = model.addon; if (addon?.cancel) { - void addon.cancel.call(addon); + await addon.cancel.call(addon); logger.debug( `[cancel] no registry match for modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""} — fell back to addon.cancel()`, ); diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index 4c1573ae58..93d683022d 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -355,8 +355,7 @@ function prepareMessagesForCache( if (lastMsg.role === "user") { const prevMsg = history[history.length - 2]; - const tail = - prevMsg?.role === "assistant" ? [prevMsg, lastMsg] : [lastMsg]; + const tail = prevMsg?.role === "assistant" ? [prevMsg, lastMsg] : [lastMsg]; return [...transformMessages(tail), ...addTools]; } @@ -514,239 +513,249 @@ export async function* completion( // single source of truth for "what cancel does." if (signal.aborted) onAbort(); - if (kvCache) { - const systemPromptFromHistory = extractSystemPrompt(history); - // Dynamic mode lets each turn carry its own tool set, so the cache - // hash must not depend on the tool list — otherwise a tool change - // would force a fresh cache file and defeat the whole optimisation. - const configHash = generateConfigHash( - systemPromptFromHistory, - dynamicTools ? undefined : tools, - ); - - const systemPromptToUse = - systemPromptFromHistory || - (modelConfig as { system_prompt?: string }).system_prompt || - "You are a helpful assistant."; - - let cachePathToUse: string; - - if (typeof kvCache === "string") { - cachePathToUse = await getCacheFilePath(modelId, configHash, kvCache); - let cacheExists = await customCacheExists(modelId, configHash, kvCache); - logCacheStatus(kvCache, cacheExists); - - if (!cacheExists) { - await initSystemPromptCache( - model, - cachePathToUse, - systemPromptToUse, - kvCache, - // Static-mode tools are baked into the system-prompt cache so - // they're shared across the session. Dynamic-mode tools belong - // to a per-turn anchor and must not enter the system cache. - staticTools ? tools : undefined, - ); - markCacheInitialized(modelId, configHash, kvCache); - cacheExists = true; - } - - const messagesToSend = prepareMessagesForCache( - cachePathToUse, - cacheExists, - history, - dynamicTools ? tools : undefined, - ); - logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - - const result = yield* processModelResponse( - model, - messagesToSend, - tools, - mergedGenerationParams, - { cacheKey: cachePathToUse, saveCacheToDisk: true }, - dialect, + // Wrap the body so the `abort` listener is detached on every exit path + // (happy completion, thrown error, generator `return()` from upstream). + // `{ once: true }` already removes the listener if the signal fires, so + // the `removeEventListener` here is the cleanup hook for the + // signal-never-fired path — mirrors the registry's own `detachParent` + // discipline and stops relying on GC for cleanup. + try { + if (kvCache) { + const systemPromptFromHistory = extractSystemPrompt(history); + // Dynamic mode lets each turn carry its own tool set, so the cache + // hash must not depend on the tool list — otherwise a tool change + // would force a fresh cache file and defeat the whole optimisation. + const configHash = generateConfigHash( + systemPromptFromHistory, + dynamicTools ? undefined : tools, ); - if (shouldRecordSavedCount(signal, result.producedTokens)) { - // Turn ran to completion and produced content — record the new - // boundary so the next turn can slice its history. - await recordCacheSaveCount(cachePathToUse, history.length + 1); - } else { - // The addon writes the cache file unconditionally on - // `saveCacheToDisk` turns, including cancellations and zero-token - // exits, so what's left on disk holds partial decode state that - // does not correspond to a clean turn boundary. Mirror the - // auto-key handling: drop the file, clear the in-memory init - // flag (otherwise `customCacheExists` would still report true), - // and forget the saved count. Next turn re-primes the system - // prompt cleanly — a one-turn perf hit, but no risk of the - // addon loading the stale KV state. - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove cache file after cancelled or empty custom-key turn; next turn may load stale KV state. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + const systemPromptToUse = + systemPromptFromHistory || + (modelConfig as { system_prompt?: string }).system_prompt || + "You are a helpful assistant."; + + let cachePathToUse: string; + + if (typeof kvCache === "string") { + cachePathToUse = await getCacheFilePath(modelId, configHash, kvCache); + let cacheExists = await customCacheExists(modelId, configHash, kvCache); + logCacheStatus(kvCache, cacheExists); + + if (!cacheExists) { + await initSystemPromptCache( + model, + cachePathToUse, + systemPromptToUse, + kvCache, + // Static-mode tools are baked into the system-prompt cache so + // they're shared across the session. Dynamic-mode tools belong + // to a per-turn anchor and must not enter the system cache. + staticTools ? tools : undefined, ); + markCacheInitialized(modelId, configHash, kvCache); + cacheExists = true; } - clearCacheRegistry({ cacheKey: kvCache, modelId }); - cachedMessageCounts.delete(cachePathToUse); - } - return result; - } else { - // Auto-generate cache key based on conversation history - const cacheMessages: CacheMessage[] = history.map((msg) => ({ - role: msg.role, - content: msg.content, - attachments: msg.attachments ?? undefined, - })); - - const existingCache = await findMatchingCache( - modelId, - configHash, - cacheMessages, - ); - const preResponseCacheInfo = await getCurrentCacheInfo( - modelId, - configHash, - cacheMessages, - ); - - cachePathToUse = - existingCache !== null - ? existingCache.cachePath - : preResponseCacheInfo.cachePath; - let cacheExists = existingCache !== null; - logCacheStatus("auto", cacheExists); + const messagesToSend = prepareMessagesForCache( + cachePathToUse, + cacheExists, + history, + dynamicTools ? tools : undefined, + ); + logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - if (!cacheExists) { - await initSystemPromptCache( + const result = yield* processModelResponse( model, - cachePathToUse, - systemPromptToUse, - "auto", - staticTools ? tools : undefined, + messagesToSend, + tools, + mergedGenerationParams, + { cacheKey: cachePathToUse, saveCacheToDisk: true }, + dialect, + ); + + if (shouldRecordSavedCount(signal, result.producedTokens)) { + // Turn ran to completion and produced content — record the new + // boundary so the next turn can slice its history. + await recordCacheSaveCount(cachePathToUse, history.length + 1); + } else { + // The addon writes the cache file unconditionally on + // `saveCacheToDisk` turns, including cancellations and zero-token + // exits, so what's left on disk holds partial decode state that + // does not correspond to a clean turn boundary. Mirror the + // auto-key handling: drop the file, clear the in-memory init + // flag (otherwise `customCacheExists` would still report true), + // and forget the saved count. Next turn re-primes the system + // prompt cleanly — a one-turn perf hit, but no risk of the + // addon loading the stale KV state. + try { + await fsPromises.unlink(cachePathToUse); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove cache file after cancelled or empty custom-key turn; next turn may load stale KV state. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + clearCacheRegistry({ cacheKey: kvCache, modelId }); + cachedMessageCounts.delete(cachePathToUse); + } + return result; + } else { + // Auto-generate cache key based on conversation history + const cacheMessages: CacheMessage[] = history.map((msg) => ({ + role: msg.role, + content: msg.content, + attachments: msg.attachments ?? undefined, + })); + + const existingCache = await findMatchingCache( + modelId, + configHash, + cacheMessages, ); - markCacheInitialized( + const preResponseCacheInfo = await getCurrentCacheInfo( modelId, configHash, - preResponseCacheInfo.cacheKey, + cacheMessages, ); - cacheExists = true; - } - const messagesToSend = prepareMessagesForCache( - cachePathToUse, - cacheExists, - history, - dynamicTools ? tools : undefined, - ); - logMessagesToAddon(messagesToSend, "PROMPT_SEND"); + cachePathToUse = + existingCache !== null + ? existingCache.cachePath + : preResponseCacheInfo.cachePath; + + let cacheExists = existingCache !== null; + logCacheStatus("auto", cacheExists); + + if (!cacheExists) { + await initSystemPromptCache( + model, + cachePathToUse, + systemPromptToUse, + "auto", + staticTools ? tools : undefined, + ); + markCacheInitialized( + modelId, + configHash, + preResponseCacheInfo.cacheKey, + ); + cacheExists = true; + } - const result = yield* processModelResponse( - model, - messagesToSend, - tools, - mergedGenerationParams, - { cacheKey: cachePathToUse, saveCacheToDisk: true }, - dialect, - ); + const messagesToSend = prepareMessagesForCache( + cachePathToUse, + cacheExists, + history, + dynamicTools ? tools : undefined, + ); + logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - // TODO: support auto-cache for tool-call turns by keying off the - // structured assistant/tool messages callers push into history, - // not result.responseText (which is raw tool-call markup here). - // Until then, remove any cache file the addon wrote so it doesn't - // leak on disk (the next turn would compute a different key and - // never reach it). - if (result.toolCalls.length > 0) { - logger.warn( - `[kv-cache] Auto cache tool-call turn; removing orphaned cache to avoid disk leak. path=${cachePathToUse}`, + const result = yield* processModelResponse( + model, + messagesToSend, + tools, + mergedGenerationParams, + { cacheKey: cachePathToUse, saveCacheToDisk: true }, + dialect, ); - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove orphaned tool-turn cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, - ); - } - cachedMessageCounts.delete(cachePathToUse); - return result; - } - // A cancelled or zero-token turn cannot be promoted to a post-response - // cache: the post-response key is derived from `result.responseText`, - // which is empty/partial in those cases, and the on-disk cache the - // addon wrote is not aligned with the current-history hash. Treat it - // like the tool-call branch — drop the cache file and clear the count. - if (!shouldRecordSavedCount(signal, result.producedTokens)) { - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { + // TODO: support auto-cache for tool-call turns by keying off the + // structured assistant/tool messages callers push into history, + // not result.responseText (which is raw tool-call markup here). + // Until then, remove any cache file the addon wrote so it doesn't + // leak on disk (the next turn would compute a different key and + // never reach it). + if (result.toolCalls.length > 0) { logger.warn( - `[kv-cache] Failed to remove cache file after cancelled or empty turn; disk leak possible. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + `[kv-cache] Auto cache tool-call turn; removing orphaned cache to avoid disk leak. path=${cachePathToUse}`, ); + try { + await fsPromises.unlink(cachePathToUse); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove orphaned tool-turn cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + cachedMessageCounts.delete(cachePathToUse); + return result; } - cachedMessageCounts.delete(cachePathToUse); - return result; - } - const savedHistory = buildAutoCacheSaveHistory( - cacheMessages, - result.responseText, - ); - const postResponseCacheInfo = await getCurrentCacheInfo( - modelId, - configHash, - savedHistory, - ); + // A cancelled or zero-token turn cannot be promoted to a post-response + // cache: the post-response key is derived from `result.responseText`, + // which is empty/partial in those cases, and the on-disk cache the + // addon wrote is not aligned with the current-history hash. Treat it + // like the tool-call branch — drop the cache file and clear the count. + if (!shouldRecordSavedCount(signal, result.producedTokens)) { + try { + await fsPromises.unlink(cachePathToUse); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove cache file after cancelled or empty turn; disk leak possible. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + cachedMessageCounts.delete(cachePathToUse); + return result; + } - if ( - !(await renameCacheFile( - cachePathToUse, - postResponseCacheInfo.cachePath, - )) - ) { - logger.warn( - `[kv-cache] Auto cache rename failed; removing stale cache to avoid disk leak. from=${cachePathToUse} to=${postResponseCacheInfo.cachePath}`, + const savedHistory = buildAutoCacheSaveHistory( + cacheMessages, + result.responseText, ); - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { + const postResponseCacheInfo = await getCurrentCacheInfo( + modelId, + configHash, + savedHistory, + ); + + if ( + !(await renameCacheFile( + cachePathToUse, + postResponseCacheInfo.cachePath, + )) + ) { logger.warn( - `[kv-cache] Failed to remove stale cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + `[kv-cache] Auto cache rename failed; removing stale cache to avoid disk leak. from=${cachePathToUse} to=${postResponseCacheInfo.cachePath}`, ); + try { + await fsPromises.unlink(cachePathToUse); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove stale cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + cachedMessageCounts.delete(cachePathToUse); + return result; } + cachedMessageCounts.delete(cachePathToUse); + await recordCacheSaveCount( + postResponseCacheInfo.cachePath, + savedHistory.length, + ); + return result; } + } else { + let historyWithTools: Array = history; + if (staticTools && tools) { + historyWithTools = prependToolsToHistory(history, tools); + } else if (dynamicTools && tools) { + historyWithTools = appendToolsToHistory(history, tools); + } - cachedMessageCounts.delete(cachePathToUse); - await recordCacheSaveCount( - postResponseCacheInfo.cachePath, - savedHistory.length, + const transformedHistory = transformMessages(historyWithTools); + logCacheDisabled(); + logMessagesToAddon(transformedHistory, "NO_CACHE"); + return yield* processModelResponse( + model, + transformedHistory, + tools, + mergedGenerationParams, + undefined, + dialect, ); - - return result; - } - } else { - let historyWithTools: Array = history; - if (staticTools && tools) { - historyWithTools = prependToolsToHistory(history, tools); - } else if (dynamicTools && tools) { - historyWithTools = appendToolsToHistory(history, tools); } - - const transformedHistory = transformMessages(historyWithTools); - logCacheDisabled(); - logMessagesToAddon(transformedHistory, "NO_CACHE"); - return yield* processModelResponse( - model, - transformedHistory, - tools, - mergedGenerationParams, - undefined, - dialect, - ); + } finally { + signal.removeEventListener("abort", onAbort); } } diff --git a/packages/sdk/server/rpc/handlers/cancelHandler.ts b/packages/sdk/server/rpc/handlers/cancelHandler.ts index 62c7d74fb9..f0ee678ff1 100644 --- a/packages/sdk/server/rpc/handlers/cancelHandler.ts +++ b/packages/sdk/server/rpc/handlers/cancelHandler.ts @@ -10,16 +10,22 @@ import { getServerLogger } from "@/logging"; const logger = getServerLogger(); -export function cancelHandler( +export async function cancelHandler( request: CancelRequest, ): Promise { try { switch (request.operation) { case "inference": - cancel({ modelId: request.modelId }, { kind: "completion" }); + // Awaited so the RPC response resolves after the addon has + // acknowledged the cancel for non-registry-migrated handlers + // (embeddings / transcription / translation / decoder / OCR / TTS + // until M3b/M3c). The registry-routed path inside `cancel()` is + // already synchronous w.r.t. the abort, so the await is a no-op + // for completion-stream's signal-driven cancel. + await cancel({ modelId: request.modelId }, { kind: "completion" }); break; case "embeddings": - cancel({ modelId: request.modelId }, { kind: "embeddings" }); + await cancel({ modelId: request.modelId }, { kind: "embeddings" }); break; case "request": { const cancelled = getRequestRegistry().cancel({ @@ -46,16 +52,16 @@ export function cancelHandler( } } - return Promise.resolve({ + return { type: "cancel", success: true, - }); + }; } catch (error) { logger.error("Error during cancellation:", error); - return Promise.resolve({ + return { type: "cancel", success: false, error: error instanceof Error ? error.message : "Unknown error", - }); + }; } } diff --git a/packages/sdk/test/unit/runtime/request-registry.test.ts b/packages/sdk/test/unit/runtime/request-registry.test.ts index 9655ba58cc..86fa0f4385 100644 --- a/packages/sdk/test/unit/runtime/request-registry.test.ts +++ b/packages/sdk/test/unit/runtime/request-registry.test.ts @@ -235,6 +235,76 @@ test("registry: end without prior begin is a no-op", async (t: T) => { t.is(r.list().length, 0); }); +test("registry: end() detaches parent listener so long-lived parents don't accumulate listeners", async (t: T) => { + // The `parentSignal` composition exists so a worker-level shutdown + // signal can compose into per-request signals. Without an explicit + // `detachParent` discipline, every `begin(...)` would leave a listener + // on the long-lived parent for the lifetime of the worker — a slow + // O(n requests) leak that's invisible until production-scale traffic. + // Verify the listener is removed on the request's `end()` path. + const parent = new AbortController(); + let adds = 0; + let removes = 0; + const origAdd = parent.signal.addEventListener.bind(parent.signal); + const origRemove = parent.signal.removeEventListener.bind(parent.signal); + parent.signal.addEventListener = ((...args: Parameters) => { + adds++; + return origAdd(...args); + }) as typeof parent.signal.addEventListener; + parent.signal.removeEventListener = (( + ...args: Parameters + ) => { + removes++; + return origRemove(...args); + }) as typeof parent.signal.removeEventListener; + + const r = createRequestRegistry(); + for (let i = 0; i < 5; i++) { + const id = `r-${i}`; + const ctx = r.begin({ + requestId: id, + kind: "completion", + modelId: "m1", + parentSignal: parent.signal, + }); + t.is(ctx.state, "running"); + await r.end(id, "completed"); + } + t.is(adds, 5, "each begin() with parentSignal registered one listener"); + t.is( + removes, + 5, + "each end() removed it — long-lived parent doesn't accumulate listeners", + ); +}); + +test("registry: same-tick cancel-before-begin returns 0 and does not retroactively abort the later begin()", async (t: T) => { + // Documents the current M1 behavior of the Stop-button race the + // synchronous-`requestId` design property allows: client generates a + // `requestId` and immediately fires `cancel({ requestId })` before the + // server-side `begin(...)` lands. The registry has nothing to match, + // so the cancel is a no-op — the subsequent `begin(...)` runs to + // completion. M2's typed-cancel outcomes will close this gap (likely + // via a small bounded "cancelled-before-begin" set checked by + // `begin(...)`); this test pins the current contract so the M2 change + // surfaces here. + const r = createRequestRegistry(); + const cancelled = r.cancel({ requestId: "r-1" }); + t.is(cancelled, 0, "no entry yet — cancel returns 0"); + + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is( + ctx.signal.aborted, + false, + "subsequent begin() is not retroactively aborted by the pre-begin cancel", + ); + t.is(ctx.state, "running"); +}); + test("registry: derived terminal state is 'cancelled' if signal aborted, 'completed' otherwise", async (t: T) => { const r = createRequestRegistry(); From e70c46e366f306fc3b9406840c7f4835da0e1f85 Mon Sep 17 00:00:00 2001 From: Simon Iribarren Date: Tue, 12 May 2026 16:11:37 +0200 Subject: [PATCH 6/6] QVAC-18181 chore: address Opanin's review + add Cursor rules for primitives MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves https://github.com/tetherto/qvac/pull/1949#discussion_r3226842363 and https://github.com/tetherto/qvac/pull/1949#discussion_r3226856787. Code: - cancelHandler: add exhaustive `default` to the operation switch so any future `CancelRequest` op that isn't added here surfaces as `success: false` with a descriptive error instead of silently returning `success: true`. The `never` assignment makes the drift a compile-time failure too. No-plain-Error rule respected — branch returns a typed `CancelResponse`, no `throw`. - completion-stream onAbort: replace `void addon.cancel.call(addon)` with `.catch(...)` so a rejection from the addon's cancel doesn't leak as an unhandledRejection. Listener stays best-effort fire-and-forget (event listeners can't await) but errors are now logged instead of swallowed. Docs: - `.cursor/rules/sdk/request-lifecycle-primitives.mdc` — auto-firing rule for `packages/sdk/server/bare/**`. Canonical handler shape, signal-leaf discipline, anti-patterns (no `if (signal.aborted) return` polling, no manual cancel cleanup, no AbortController passing, no plain Error throw), cancel API surface, primitives reference, error codes. - `.cursor/rules/sdk/docs/request-lifecycle-system.mdc` — full topic doc. Design rationale (what this replaces, why three primitives and not a framework, the build-vs-reuse calculus), migration roadmap (M1/M2/M3a-d), FAQ (singleton vs factory, caller-provided ids, `await using` semantics, runtime guard rationale, addon cancel relationship, known cancel-before-begin race). Both rules cross-link to `error-handling.mdc` and the existing `kv-cache-system.mdc` so agents have the full picture from any entry point. --- .../sdk/docs/request-lifecycle-system.mdc | 166 +++++++++++ .../sdk/request-lifecycle-primitives.mdc | 269 ++++++++++++++++++ .../ops/completion-stream.ts | 16 +- .../sdk/server/rpc/handlers/cancelHandler.ts | 18 ++ 4 files changed, 465 insertions(+), 4 deletions(-) create mode 100644 .cursor/rules/sdk/docs/request-lifecycle-system.mdc create mode 100644 .cursor/rules/sdk/request-lifecycle-primitives.mdc diff --git a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc new file mode 100644 index 0000000000..b1c86ed46a --- /dev/null +++ b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc @@ -0,0 +1,166 @@ +--- +description: Request Lifecycle System - design rationale, migration roadmap, and FAQ for RequestRegistry / RequestContext / DisposableScope +alwaysApply: false +--- + +# Request Lifecycle System + +Companion to [`request-lifecycle-primitives.mdc`](../request-lifecycle-primitives.mdc). That rule fires automatically when editing server-side handlers and tells you *what* to do; this doc explains *why* the primitives are shaped this way, *what* they replace, and *where* the migration is heading. + +## Overview + +SDK 0.11.0 introduces three primitives that together give every server-side long-running operation a single owner for cancellation and cleanup: + +| Primitive | Lives in | Role | +|----------------------|-----------------------------------------------------------|---------------------------------------------------------------------------------------------------| +| `DisposableScope` | `server/bare/runtime/disposable-scope.ts` | LIFO cleanup container with `Symbol.asyncDispose` + error aggregation. | +| `RequestContext` | `server/bare/runtime/request-context.ts` | Per-request handle: `requestId`, `kind`, `modelId`, `signal`, `scope`, `state`. | +| `RequestRegistry` | `server/bare/runtime/request-registry.ts` (+ singleton) | Worker-scoped registry that mints `ManagedRequestContext`s and routes cancel by id or model. | + +Public surface is re-exported from `@/server/bare/runtime`. + +## What This Replaces + +Before 0.11.0, every long-running handler reinvented its own bookkeeping. Three concrete pain points motivated the primitives: + +### 1. Broad-cancel only + +The pre-0.11.0 wire contract was `cancel({ operation: "inference", modelId })` — every in-flight request on that model died together. Cancelling one tab's completion would also kill another tab's transcription if they shared a model. The new `cancel({ requestId })` path threads through to a single context. + +### 2. Drift between three KV-cache layers + +`server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` previously coordinated three ad-hoc bookkeeping layers around cancel/error: + +1. An in-memory `Set` of "initialized caches" (`kv-cache-utils.ts`). +2. A `Map` of saved-message counts (`kv-cache-state.ts`). +3. The on-disk `.bin` files written by the addon. + +Each cancel/error branch had to: `unlink` the file, `clearCacheRegistry({ cacheKey, modelId })`, `cachedMessageCounts.delete(...)`. The three branches duplicated this trio; any one of them could drift from the others on a partial failure. Search for `clearCacheRegistry` in `completion-stream.ts` to see the duplication. + +`DisposableScope` collapses this into one place (commit-on-success, rollback through `Symbol.asyncDispose` on anything else). **M2** introduces `KvCacheSession` as the single owner of the three layers and replaces the duplicated cleanup blocks. + +### 3. Cancellation tracked via side counters + +The addon already exposes `addon.cancel()`. But the SDK's cancellation truth was a `cancelCounter` variable (incremented from the cancel handler) plus `signal.aborted` plus `producedTokens === 0` checks — a Venn diagram that handlers consulted in different ways. The signal is now the single source of truth: registry owns the controller, handler listens on `signal`, post-completion bookkeeping reads `signal.aborted` synchronously. + +## Why These Three Primitives (and not a framework) + +The brief landing this design was "give us a clean architectural layer for request lifecycle." Three options were on the table: + +### Option A: A lifecycle framework (react-reconciler / xstate / redux-saga / observable libs) + +Rejected. Heavyweight, unconventional inside the Bare worker, drags consumers along, overkill for server-side request lifecycle. The state-mgmt-dependencies discussion in early planning landed on "no framework." + +### Option B: Plain `AbortController` + ad-hoc `try/finally` scattered through handlers + +Rejected. This is the option that produces the "manual checks everywhere" failure mode — there's no cleanup primitive to lean on, so every handler reinvents its own. The cancel-counter / `producedTokens` / `fs.access`-probe pattern in pre-0.11.0 code is what this option looks like at scale. + +### Option C: Three tiny in-house primitives sitting on standard `AbortSignal` + `Symbol.asyncDispose` (chosen) + +The smallest thing that gives the auto-cleanup property a framework would give, without the framework footprint. ~100 LOC of domain-specific glue on top of platform primitives. The contract is "`AbortController` because it's the JS platform's actual contract for cancellation — `fetch`, `setTimeout`, every modern HTTP client, async-iterator hooks all use it — wrapped in three primitives that give us the structured behaviour a framework would otherwise provide." + +Concretely, Vercel's AI SDK (`streamText`) is a public-codebase example of the same shape — `AbortSignal` plumbed through, library-internal cleanup hooked off `signal.onabort` at a single leaf, no polling in user code. The primitives mirror that discipline for our internal handlers. + +## Anti-Patterns Explicitly Avoided + +| Anti-pattern | What it produces | What to do instead | +|-------------------------------------------------------|-------------------------------------------------|---------------------------------------------------------------| +| `if (signal.aborted) return;` mid-handler | Cancel logic scattered, drifts from registry | Consume signal at the addon leaf; let propagation do its job | +| Manual cleanup in `if (signal.aborted) { ... }` | Duplicated rollback on cancel and error paths | `ctx.scope.defer(...)` or `await using` rollback | +| Side counter (`cancelCounter++`) for cancel tracking | Truth drifts from `signal.aborted` | Read `signal.aborted` synchronously when needed | +| Passing `AbortController` between handlers | Multiple owners, no single revoke | Handlers receive `ctx.signal` from `registry.begin(...)` | +| `throw new Error("cancelled")` | Loses partial state across RPC | Structured error (`InferenceCancelledError` in M2) | + +`request-lifecycle-primitives.mdc` has the worked code examples. + +## Migration Roadmap + +The full milestone breakdown lives in `tasks/release-0.11.0-planning/pitch-2-tasks.md` (workspace-local; not committed to the repo). Headline: + +| Milestone | Wire contract | Handlers migrated | +|-----------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------| +| **M1** | `cancel({ requestId })` lands; old `cancel({ modelId })` still works | llama.cpp completion | +| **M2** | Typed cancel outcomes; `KvCacheSession` replaces three-layer cancel bookkeeping; closes the same-tick cancel-before-begin race | (M2 introduces `InferenceCancelledError`; no new handlers) | +| **M3a** | Migrate embeddings handler | embeddings | +| **M3b** | Migrate audio/text handlers | transcribe, translate | +| **M3c** | Migrate decoder / OCR / TTS | diffusion, ocr, tts | +| **M3d** | Migrate downloadAsset / loadModel | downloadAsset, loadModel | + +Until a handler is registry-migrated, the broad-cancel path (`cancel({ operation: , modelId })`) falls back to `addon.cancel()` directly — see the fallback in `server/bare/ops/cancel.ts`. The wire contract for non-migrated kinds is therefore unchanged: callers continue to use `cancel({ operation: , modelId })` exactly as before, and behavior is preserved through the migration. + +## FAQ + +### Why is the registry a module-scoped singleton? + +The server-side SDK runs in exactly one Bare worker per host process. Per-request state must be reachable from RPC handlers, addon callbacks, and shutdown sweeps — all of which already share the worker globals (`getModel`, `cancelTransfer`, etc.). A singleton matches that scope. + +`createRequestRegistry()` is exported alongside `getRequestRegistry()` so unit tests can mint a fresh, isolated registry. Production code only ever uses the singleton. + +### Why are `requestId`s caller-provided rather than registry-minted? + +The client and server need to agree on the id *before* the request lands on the server, otherwise the synchronous `cancel({ requestId })` path doesn't work (client wouldn't have an id to send). Clients generate a UUIDv4, expose it on the result object (`run.requestId`), and pass it to the server in the request envelope. `RequestIdConflictError` (code 52417) guards the astronomically-unlikely collision. + +### Why caller-provided ids and not server-minted with a response? + +For streaming/long-running APIs, the client wants to know the `requestId` *before* the first response chunk so the UI's "stop" button is wired immediately. Returning a server-minted id in the first chunk forces a round-trip race. Caller-provided ids close that race. + +### What does `await using` actually do here? + +ES2024's `await using` declaration calls `Symbol.asyncDispose` on the bound value when the enclosing scope exits — regardless of how it exits (normal return, throw, generator close, cancellation). For our `ManagedRequestContext`, that disposal: + +1. Sets `ctx.state` to the terminal outcome (derived from `signal.aborted` if not overridden). +2. Detaches the `parentSignal` listener so long-lived shutdown signals don't accumulate per-request handlers. +3. Removes the entry from the registry's `Map`. +4. Unwinds the `DisposableScope` (LIFO cleanups, aggregating errors). + +Everything is idempotent — calling `[Symbol.asyncDispose]()` twice (or `await using` followed by an explicit `registry.end(...)`) is safe. + +### Why a module-load guard for `Symbol.asyncDispose`? + +`Symbol.asyncDispose` is ES2024. Older Bare builds (pre-V8 12.5) and some Expo SDK polyfills don't expose it. If the global symbol is `undefined`, the `[Symbol.asyncDispose]:` property key in our code coerces to the string `"undefined"`, producing objects that *look* async-disposable but aren't — handlers would silently leak registry entries forever. + +`disposable-scope.ts` checks `typeof Symbol.asyncDispose !== "symbol"` at import time and throws `AsyncDisposeUnavailableError` (code 53503) if it's missing. The server SDK runs only in Bare (regardless of whether the host app is Node / Bun / Electron / Expo / RN — the worker is always Bare), so the single guard covers the entire server-side surface. Every server entry-point imports `request-registry.ts`, which imports `disposable-scope.ts`, so the guard runs at the very first server-side import. + +### Does this work in Node? Bun? React Native? + +The primitives only need `AbortController` + `AbortSignal` + `Symbol.asyncDispose`. All three are standard in Node 22+, Bun 1.0+, and modern Bare. The server SDK is Bare-only by construction, so client-side imports (`@/server/bare/...`) don't apply — there's nothing for Node/Bun host apps to import directly. + +### How do I unit-test a handler that uses the registry? + +Use `createRequestRegistry()` to get a fresh registry in the test, pass it explicitly to the handler under test, and assert against `registry.list()` + `registry.get(requestId)`. See `test/unit/runtime/request-registry.test.ts` for the patterns. Production code uses the singleton (`getRequestRegistry()`), so the handler accepting an injected registry is a useful test seam. + +### What if I want to cancel by something other than `requestId` or `modelId`? + +Extend the `CancelTarget` discriminated union in `request-registry.ts`. Today supported shapes: `{ requestId }` and `{ modelId, kind? }`. If a third shape genuinely doesn't compose from those (e.g. cancel-by-user across multiple models), open a discussion before adding it — most legitimate cases are covered by combining `list()` + targeted cancels. + +### What's the relationship to the addon's `cancel`? + +The addon (`@qvac/llm-llamacpp` and friends) exposes a synchronous `cancel(jobId?)` that signals the C++ side to stop decoding ASAP. The SDK wires `signal.addEventListener("abort", onAbort, { once: true })` so a registry cancel fires `addon.cancel()` at the binding leaf — but the SDK's source of truth for "was this cancelled" is still `signal.aborted`, not addon state. This keeps the SDK decoupled from per-addon cancel semantics; an addon without a `cancel()` method still gets best-effort cancellation through the SDK's signal-propagated termination of the streaming loop. + +### Known gap: same-tick "cancel-before-begin" race + +If a client cancels via `cancel({ requestId })` before the server's `registry.begin({ requestId })` has run for that id, the cancel finds zero matches and returns. The later `begin(...)` opens a fresh context that the cancel didn't reach. M1 documents this with a tripwire unit test (`registry: same-tick cancel-before-begin returns 0 and does not retroactively abort the later begin()`); M2 closes the race with a bounded "cancelled-before-begin" set the registry consults from inside `begin(...)`. See `pitch-2-tasks.md` (M2 scope). + +## Implementation Files + +| File | Purpose | +|-------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| +| `server/bare/runtime/disposable-scope.ts` | `DisposableScope` factory + module-load `Symbol.asyncDispose` guard | +| `server/bare/runtime/request-context.ts` | `RequestContext`, `RequestKind`, `RequestState` types | +| `server/bare/runtime/request-registry.ts` | `createRequestRegistry()`, `ManagedRequestContext`, `RequestOutcome`, cancel/begin/end/list logic | +| `server/bare/runtime/request-registry-singleton.ts` | `getRequestRegistry()` worker-scoped accessor | +| `server/bare/runtime/request-id.ts` | UUID generation helper for caller-provided ids | +| `server/bare/runtime/index.ts` | Public re-exports — handlers import from `@/server/bare/runtime` | +| `server/bare/ops/cancel.ts` | Broad-cancel op: registry-routed with addon fallback for non-migrated handlers | +| `server/rpc/handlers/cancelHandler.ts` | RPC entry point: dispatches by `operation` (inference / embeddings / request / downloadAsset / rag) | +| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts`| First handler migrated — reference implementation of the canonical shape | +| `test/unit/runtime/disposable-scope.test.ts` | Scope contract: LIFO, idempotency, error aggregation, late-defer | +| `test/unit/runtime/request-registry.test.ts` | Registry contract: begin/cancel/end, parent-signal composition, listener detach, cancel-before-begin | +| `schemas/cancel.ts` | `CancelRequest` discriminated union + `cancelByRequestIdSugarSchema` for `sdk.cancel({ requestId })` | +| `utils/errors-server.ts` | `RequestIdConflictError` (52417), `AsyncDisposeUnavailableError` (53503) | + +## See Also + +- [`request-lifecycle-primitives.mdc`](../request-lifecycle-primitives.mdc) — canonical handler shape + anti-patterns (this rule fires automatically when editing server-side handlers). +- [`error-handling.mdc`](../error-handling.mdc) — structured-error placement and the client/server error separation. +- [`docs/kv-cache-system.mdc`](./kv-cache-system.mdc) — the bookkeeping layers that `KvCacheSession` (M2) will consolidate. diff --git a/.cursor/rules/sdk/request-lifecycle-primitives.mdc b/.cursor/rules/sdk/request-lifecycle-primitives.mdc new file mode 100644 index 0000000000..145eef0d7d --- /dev/null +++ b/.cursor/rules/sdk/request-lifecycle-primitives.mdc @@ -0,0 +1,269 @@ +--- +description: Request lifecycle primitives (RequestRegistry, RequestContext, DisposableScope, AbortSignal) - canonical handler shape and anti-patterns for SDK server-side cancellable operations +globs: + - packages/sdk/server/bare/**/*.ts + - packages/sdk/server/rpc/handlers/**/*.ts + - packages/sdk/client/api/cancel.ts + - packages/sdk/client/api/completion.ts +alwaysApply: false +--- + +# Request Lifecycle Primitives + +Server-side long-running operations (`completion`, `embeddings`, `transcribe`, `translate`, `loadModel`, `downloadAsset`, `rag`, etc.) all go through three primitives in `@/server/bare/runtime`: + +- **`DisposableScope`** — LIFO cleanup container disposed via `Symbol.asyncDispose`. +- **`RequestContext`** — per-request handle bundling `requestId`, `kind`, `modelId`, `signal`, `scope`, `state`. +- **`RequestRegistry`** — module-scoped registry that mints contexts via `begin(...)` and routes `cancel(...)` by `requestId` or `modelId`. + +Migration is rolling out across milestones (M1 ships completion; M2 adds typed cancel outcomes + `KvCacheSession`; M3 migrates embeddings / transcribe / translate / loadModel / downloadAsset). The contract below applies to every newly-migrated handler. + +## Canonical Handler Shape + +Every cancellable server-side handler MUST follow this shape: + +```typescript +import { getRequestRegistry } from "@/server/bare/runtime"; + +async function* handleX(req: XRequest): AsyncGenerator { + const registry = getRequestRegistry(); + await using ctx = registry.begin({ + requestId: req.requestId, + kind: "completion", // or "embeddings", "transcribe", "translate", ... + modelId: req.modelId, + }); + + // Wire the abort signal at exactly ONE leaf — the addon binding. + const onAbort = () => { + const addon = getModel(req.modelId).addon; + if (addon?.cancel) { + addon.cancel.call(addon).catch((err) => { + logger.warn(`[cancel] addon.cancel rejected: ${String(err)}`); + }); + } + }; + ctx.signal.addEventListener("abort", onAbort, { once: true }); + if (ctx.signal.aborted) onAbort(); // parent-signal-already-aborted case + + try { + for await (const event of model.runStreaming(req, { signal: ctx.signal })) { + yield event; + } + } finally { + ctx.signal.removeEventListener("abort", onAbort); + } +} +``` + +Key invariants in this shape: + +1. **`await using ctx = registry.begin(...)`** — the registry mints a `ManagedRequestContext` whose `Symbol.asyncDispose` removes the entry and unwinds the scope. `await using` guarantees disposal on every exit path (return / throw / generator close / cancellation). +2. **Signal consumed at one place only** — the addon binding leaf. After that the addon throws or returns, the loop exits naturally, and the scope unwinds. +3. **`{ once: true }` listener + `finally` removeEventListener** — `{ once: true }` auto-removes if the signal fires; the `finally` is the cleanup hook for the signal-never-fired path. Both together mean no leaked listeners on long-lived parent signals. +4. **`if (ctx.signal.aborted) onAbort()` fall-through** — `addEventListener("abort", ..., { once: true })` does NOT fire if the signal is already aborted at register time. This line handles the case where the registry synchronously aborts a fresh controller because the `parentSignal` was already aborted at `begin(...)`. + +## DO NOT (Anti-Patterns) + +These patterns produce the bugs the lifecycle primitives exist to prevent. Reviewers will block PRs that introduce them. + +### Polling `signal.aborted` mid-handler + +```typescript +// WRONG — polling cancellation through the handler body +for await (const chunk of model.runStreaming(req, { signal })) { + if (signal.aborted) return; // <-- DO NOT + yield chunk; +} +``` + +Signal is consumed at exactly one point: the addon binding. After that, cancellation propagates by the addon returning / throwing, the loop exiting, and the scope unwinding. Polling scatters cancellation logic and gets out of sync with the truth (registry state). + +### Manual cleanup in `if (signal.aborted) { ... }` branches + +```typescript +// WRONG — duplicated cleanup on the cancel branch +const cachePath = await getCachePath(...); +const result = await model.run(...); +if (signal.aborted) { + await fs.unlink(cachePath); + clearCacheRegistry({ ... }); + return; // <-- DO NOT +} +await fs.unlink(cachePath); // also runs on happy path +``` + +Register cleanup with `ctx.scope.defer(...)` (or commit-on-success / rollback-on-anything-else with `await using`). Cleanup runs regardless of how the handler exits — the cancel branch is not special. + +### Tracking cancellation through a side counter + +```typescript +// WRONG — bookkeeping that drifts from real addon state +let cancelCounter = 0; +function onCancel() { cancelCounter++; } +// ... later +if (cancelCounter > 0) { /* assume cancelled */ } +``` + +The signal IS the source of truth. The registry owns it; the addon listens to it; everything else reads `signal.aborted` synchronously when needed (e.g. post-completion bookkeeping like `shouldRecordSavedCount(signal, producedTokens)`). + +### Passing `AbortController` instances around + +```typescript +// WRONG — handlers should never see the controller +function handleX(req, controller: AbortController) { ... } +function handleY(req, signal: AbortSignal) { ... } // also wrong — get it from ctx +``` + +Controllers are owned by the registry. Handlers receive `ctx.signal` from `registry.begin(...)` and never construct or override a controller themselves. Cancellation always enters through `registry.cancel(...)` or `registry.cancelAll(...)`. + +### Throwing a plain `Error` on the cancel branch + +```typescript +// WRONG +if (signal.aborted) throw new Error("cancelled"); +``` + +Use a structured error from `@/utils/errors-server` — see `error-handling.mdc`. M2 will add a dedicated `InferenceCancelledError` (cancelled promise-aggregates carry partial state across the RPC boundary). Until then, the `events` stream simply ends and existing `CompletionFailedError` carries cancel-as-failure cases. + +## Cancel API Surface + +There are two cancel paths exposed to clients: + +### Targeted (preferred, new in 0.11.0) + +Cancel by `requestId`. Pair with the `requestId` field exposed on `CompletionRun` (and equivalent long-running result objects): + +```typescript +// Client side +const run = sdk.completion({ ... }); +console.log(run.requestId); // available synchronously + +// Later, from anywhere with access to the SDK client: +await sdk.cancel({ requestId: run.requestId }); +``` + +### Broad (escape hatch) + +Cancel every in-flight request matching a `modelId` — for model unload, app shutdown, admin sweeps. Kept stable from pre-0.11.0: + +```typescript +await sdk.cancel({ operation: "inference", modelId }); +await sdk.cancel({ operation: "embeddings", modelId }); +``` + +Internally, both paths land on `RequestRegistry.cancel(...)`. The broad path falls back to `addon.cancel()` for handler kinds that haven't been registry-migrated yet (everything except llama.cpp completion in 0.11.0). + +## Primitives Reference + +All exports come from `@/server/bare/runtime`: + +```typescript +import { + // singleton accessor + getRequestRegistry, + + // factory (test code only — production uses the singleton) + createRequestRegistry, + + // scope factory (rarely used directly — `registry.begin(...)` carries one) + createDisposableScope, +} from "@/server/bare/runtime"; + +import type { + RequestRegistry, + RequestContext, + ManagedRequestContext, // RequestContext & AsyncDisposable + RequestKind, + RequestState, // "running" | "cancelling" | "completed" | "failed" | "cancelled" + RequestOutcome, // "completed" | "failed" | "cancelled" + BeginOpts, + CancelTarget, + CancelByRequestId, + CancelByModelId, + DisposableScope, +} from "@/server/bare/runtime"; +``` + +### `RequestRegistry` + +```typescript +interface RequestRegistry { + begin(opts: BeginOpts): ManagedRequestContext; + get(requestId: string): RequestContext | null; + list(): RequestContext[]; + cancel(target: CancelTarget): number; // count of contexts whose abort fired this call + cancelAll(reason: "shutdown" | "modelUnload"): Promise; + end(requestId: string, outcome: RequestOutcome): Promise; +} +``` + +`cancel(...)` returns the number of contexts cancelled by *this* call (already-cancelled contexts are skipped, so the count is "newly cancelled," safe to log as "n requests cancelled" once). + +### `DisposableScope` + +```typescript +interface DisposableScope { + defer(cleanup: () => Promise | void): void; + [Symbol.asyncDispose](): Promise; + readonly disposed: boolean; +} +``` + +Cleanups run in LIFO order on dispose. If multiple cleanups throw, an `AggregateError` aggregates them so no failure is silently dropped. Calling `defer` AFTER dispose runs the cleanup eagerly — resources never leak silently. + +### Error Codes + +Two errors are owned by this stack today (both in `@/utils/errors-server`): + +| Code | Class | When | +|-------|--------------------------------|----------------------------------------------------------------------------| +| 52417 | `RequestIdConflictError` | `registry.begin(...)` called with a `requestId` already present. | +| 53503 | `AsyncDisposeUnavailableError` | Module-load guard: host runtime doesn't expose `Symbol.asyncDispose`. | + +M2 adds `InferenceCancelledError` for the typed cancel-outcome contract — promise-aggregates (`final`, `text`, `toolCalls`, `stats`) reject with it carrying the partial state, while the `events` stream ends normally with `stopReason: "cancelled"`. Until M2 ships, handlers fall back on `CompletionFailedError` / existing per-op errors. + +## Verification + +Two unit-test files pin the contract — read them as canonical examples: + +- `packages/sdk/test/unit/runtime/disposable-scope.test.ts` — LIFO cleanup, idempotency, error aggregation, late `defer` behavior. +- `packages/sdk/test/unit/runtime/request-registry.test.ts` — `begin`/`cancel`/`end` flow, `requestId` conflict detection, parent-signal composition + listener detach discipline, and the same-tick "cancel-before-begin" tripwire. + +When adding new behavior to the primitives, add the test before the implementation and pair it with the corresponding doc update in this rule. + +## Common Tasks + +### Migrating a new handler onto the registry + +1. Identify the `RequestKind` (`embeddings`, `transcribe`, `translate`, ...). +2. Replace any ad-hoc cancel bookkeeping (counters, flags, manual `signal.aborted` polling) with the canonical handler shape above. +3. Wire `ctx.signal` to the addon binding leaf with `addEventListener("abort", onAbort, { once: true })` + the `if (signal.aborted) onAbort()` fall-through + a `try/finally` removeEventListener. +4. Replace duplicated cleanup branches with `ctx.scope.defer(...)` or `await using` rollbacks. +5. Verify the broad-cancel path still works: `cancel({ operation: , modelId })` should land on `registry.cancel({ modelId, kind })` and propagate via the registered listener — no addon-level fallback needed once the kind is migrated. +6. Add a unit test that begins a request, fires `cancel({ requestId })`, asserts the addon was notified once, and asserts the registry slot is freed after dispose. + +### Adding a new `RequestKind` + +Open-coded union in `packages/sdk/server/bare/runtime/request-context.ts`: + +```typescript +export type RequestKind = + | "completion" + | "embeddings" + | "transcribe" + | "translate" + // ... add here + | "yourNewKind"; +``` + +Then thread it through any cancel handler that needs to broad-cancel by kind. Editor autocomplete will surface every call site that needs updating. + +### Adding a new cancel target + +Already covered by the `CancelTarget` discriminated union. If you genuinely need a new shape (e.g. `cancel({ tag: ... })`), extend `CancelTarget` in `request-registry.ts` and update `RequestRegistry.cancel`'s switch. + +## Related Rules + +- `error-handling.mdc` — `InferenceCancelledError` / `AsyncDisposeUnavailableError` / `RequestIdConflictError` placement and propagation across the RPC boundary. +- `docs/kv-cache-system.mdc` — KV-cache bookkeeping coupled to cancellation (`shouldRecordSavedCount`, cancel-branch rollback). `KvCacheSession` is M2 — until then handlers replicate the three-layer cleanup pattern around `signal.aborted`. +- `docs/request-lifecycle-system.mdc` — full reference (design rationale, migration roadmap, FAQ). diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index 93d683022d..a1a142ccda 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -494,13 +494,21 @@ export async function* completion( // SDK still treats `signal.aborted` as the truth for cancel detection // (post-completion bookkeeping below) — this listener only shortens // the latency between "user clicked stop" and "addon stops decoding". - // Best-effort fire-and-forget: the addon's cancel resolves quickly and - // we can't await it from inside an event listener; the iterator below - // will see EOF/empty tokens once the C++ side returns. + // + // Fire-and-forget by construction (event listeners can't `await`), but + // `addon.cancel()` returns a Promise — if it ever rejects the bare + // `void` would leak it as an unhandledRejection. Attach `.catch(...)` + // so a rejection is logged and the process stays clean; the iterator + // below still sees EOF/empty tokens via the addon's normal cancel path + // so callers aren't affected. const onAbort = () => { const addon = model.addon; if (addon?.cancel) { - void addon.cancel.call(addon); + addon.cancel.call(addon).catch((err: unknown) => { + logger.warn( + `[cancel] addon.cancel() rejected during abort for modelId=${modelId}: ${err instanceof Error ? err.message : String(err)}`, + ); + }); } }; signal.addEventListener("abort", onAbort, { once: true }); diff --git a/packages/sdk/server/rpc/handlers/cancelHandler.ts b/packages/sdk/server/rpc/handlers/cancelHandler.ts index f0ee678ff1..2d889a352b 100644 --- a/packages/sdk/server/rpc/handlers/cancelHandler.ts +++ b/packages/sdk/server/rpc/handlers/cancelHandler.ts @@ -50,6 +50,24 @@ export async function cancelHandler( } break; } + default: { + // Exhaustiveness guard: if the `CancelRequest` union ever grows a + // new `operation` and this switch isn't updated, TypeScript fails + // here at compile time. At runtime the zod discriminated union in + // `cancelRequestSchema` is upstream, so reaching this branch means + // the schema and the handler have drifted — surface the + // mismatch as an explicit failure rather than a silent + // `success: true` no-op. + const _exhaustive: never = request; + void _exhaustive; + const op = (request as { operation?: string }).operation ?? "unknown"; + logger.error(`[cancel] unhandled cancel operation: ${op}`); + return { + type: "cancel", + success: false, + error: `Unhandled cancel operation: ${op}`, + }; + } } return {