Skip to content

Commit 31605bd

Browse files
QVAC-18183 feat[api]: cancel capability + per-handler cancel scope + structured logging
Lands the three M3a framework primitives so subsequent handler migration sub-PRs (M3b/M3c) have a single, declarative contract to slot into: 1. `PluginHandlerDefinition.cancel: { scope: "request" | "model" | "none"; hard?: boolean }` - Added to `schemas/plugin.ts` (`PluginHandlerCancel`, `PluginHandlerCancelScope`) + runtime schema validation on `pluginHandlerDefinitionRuntimeSchema`. - Declared on every built-in plugin manifest (llamacpp-completion, llamacpp-embedding, whispercpp/parakeet-transcription, nmtcpp-translation, onnx-tts/ocr, sdcpp-generation). The truth-table assignment is pinned by `test/unit/plugin-cancel-capability.test.ts`. 2. `RequestRegistry.policy({ kind, oneAtATimePerModel })`: - Admission control runs before scope/controller allocation in `begin(...)`. Rejecting a request raises `RequestRejectedByPolicyError` (52420) carrying `requestId`, `kind`, `modelId`, `reason` — re-exported from `@qvac/sdk` for `instanceof` checks. - The worker singleton installs `{ kind: "completion", oneAtATimePerModel: true }` on first access, matching the llama.cpp addon's single-decode-loop reality. 3. Structured `[request-lifecycle]` emits at begin/cancel/end: - Fixed log shape `requestId=<id> kind=<kind> modelId=<id|"-"> state=<state>` so `grep "requestId=abc"` returns the full per-request story chronologically. - `withRequestContext(logger, ctx)` extends the same prefix to handler-level emits; threaded through `completion(...)` and into `KvCacheSession` so KV-cache turn lifecycle shares the request's correlation tuple. - Single-cancel-emit guard suppresses duplicate cancel lines when `cancel({ requestId })` is invoked twice. Verification (from `packages/sdk/`): - `bun run lint` (eslint + tsc): clean. - `bun run test:unit`: 49 files / all asserts pass, including the 4 M3a test files (`plugin-cancel-capability` 7/7, `request-registry` 41/41, `request-lifecycle-logging` 6/6, `with-request-context` 5/5). Cursor rules updated alongside the code: - `request-lifecycle-primitives.mdc`: cancel-capability declaration table, concurrency-policy contract, structured-logging shape, error-codes table now carries 52420. - `docs/request-lifecycle-system.mdc`: migration-roadmap table reflects M3a shipped; three new FAQ entries explain *why* each primitive was chosen; implementation files table covers the new modules. - `error-handling.mdc`: 52420 row added. This PR is framework-only — no handler is migrated onto `registry.begin(...)` here beyond the completion handler that landed in M2. Handler migrations follow in M3b (inference handlers), M3c (non-inference / addon handlers), and M3d (CLI cancel bridge + cancelHandler retirement).
1 parent e2bc0e6 commit 31605bd

25 files changed

Lines changed: 1509 additions & 45 deletions

File tree

.cursor/rules/sdk/docs/request-lifecycle-system.mdc

Lines changed: 63 additions & 12 deletions
Large diffs are not rendered by default.

.cursor/rules/sdk/error-handling.mdc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ Located in `@/utils/errors-server`
180180
- `RequestIdConflictError` (52417) - `registry.begin(...)` called with a `requestId` already present
181181
- `RequestNotFoundError` (52418) - registry lookup miss (no in-flight request for the given id)
182182
- `InferenceCancelledError` (52419) - cancelled inference run; carries `requestId` + `partial: { text?, toolCalls?, stats? }`. Constructed client-side on `stopReason: "cancelled"` (event stream ends normally; promise-aggregates reject with this). Re-exported from `@qvac/sdk` for `instanceof` checks.
183+
- `RequestRejectedByPolicyError` (52420) - registry concurrency-policy admission failure (e.g. `oneAtATimePerModel`); carries `requestId`, `kind`, `modelId`, and a `reason` string. Re-exported from `@qvac/sdk` for `instanceof` checks. See `.cursor/rules/sdk/request-lifecycle-primitives.mdc` for the policy contract.
183184

184185
#### RAG Operations (52,800-52,999)
185186
- `RAGSaveFailedError` - Save failed

.cursor/rules/sdk/request-lifecycle-primitives.mdc

Lines changed: 183 additions & 9 deletions
Large diffs are not rendered by default.

packages/sdk/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,14 @@ export { SUPPORTED_AUDIO_FORMATS } from "./constants/audio";
153153
// promises. `InferenceCancelledError` rides the standard `QvacError`
154154
// envelope, but consumers reach for it through `instanceof` on
155155
// `await run.final` / `run.text` / `run.toolCalls` / `run.stats`
156-
// rejections.
156+
// rejections. `RequestRejectedByPolicyError` is thrown by
157+
// `RequestRegistry.begin(...)` when a registered concurrency policy
158+
// (e.g. `oneAtATimePerModel` on `completion`) rejects a new request;
159+
// it propagates out through the worker so the client can distinguish
160+
// "the request collided with another one" from "the request failed".
157161
export { InferenceCancelledError } from "./utils/errors-server";
158162
export type { InferenceCancelledPartial } from "./utils/errors-server";
163+
export { RequestRejectedByPolicyError } from "./utils/errors-server";
159164

160165
// Logging exports
161166
export { getLogger, SDK_LOG_ID } from "./logging";

packages/sdk/schemas/plugin.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
11
import { z } from "zod";
22
import type { ModelSrcInput } from "./model-src-utils";
33

4+
/**
5+
* Granularity at which the addon can cancel.
6+
* - `"request"` — addon cancels a specific in-flight `requestId`.
7+
* - `"model"` — addon cancels whatever is running on the model.
8+
* - `"none"` — no addon cancel surface; SDK falls back to soft-cancel
9+
* (stop yielding, drop result; the C++ work runs to completion).
10+
*/
11+
export type PluginHandlerCancelScope = "request" | "model" | "none";
12+
13+
export interface PluginHandlerCancel {
14+
scope: PluginHandlerCancelScope;
15+
/**
16+
* `true` — `addon.cancel()` interrupts compute; otherwise it's
17+
* best-effort. Only meaningful for `scope: "model" | "request"`.
18+
*/
19+
hard?: boolean;
20+
}
21+
422
/**
523
* Definition for a plugin handler with explicit Zod schemas.
624
* Each handler must define its request/response schemas for validation.
@@ -21,6 +39,11 @@ export interface PluginHandlerDefinition<
2139
) => Promise<O> | AsyncGenerator<O>
2240
: never
2341
: never;
42+
/**
43+
* Cancel surface this handler advertises. Omitting is equivalent
44+
* to `{ scope: "none" }` (soft-cancel fallback).
45+
*/
46+
cancel?: PluginHandlerCancel;
2447
}
2548

2649
/**
@@ -43,6 +66,8 @@ export interface DuplexPluginHandlerDefinition<
4366
) => AsyncGenerator<O>
4467
: never
4568
: never;
69+
/** See `PluginHandlerDefinition.cancel`. */
70+
cancel?: PluginHandlerCancel;
4671
}
4772

4873
/**
@@ -254,13 +279,23 @@ const zodSchemaLikeRuntimeSchema = z
254279
})
255280
.catchall(z.unknown());
256281

282+
const pluginHandlerCancelRuntimeSchema = z
283+
.object({
284+
scope: z.enum(["request", "model", "none"], {
285+
error: "cancel.scope must be 'request', 'model', or 'none'",
286+
}),
287+
hard: z.boolean().optional(),
288+
})
289+
.catchall(z.unknown());
290+
257291
export const pluginHandlerDefinitionRuntimeSchema = z
258292
.object({
259293
requestSchema: zodSchemaLikeRuntimeSchema,
260294
responseSchema: zodSchemaLikeRuntimeSchema,
261295
streaming: z.boolean({ error: "streaming must be a boolean" }),
262296
duplex: z.boolean().optional(),
263297
handler: functionRuntimeSchema,
298+
cancel: pluginHandlerCancelRuntimeSchema.optional(),
264299
})
265300
.catchall(z.unknown());
266301

packages/sdk/schemas/sdk-errors-server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const SDK_SERVER_ERROR_CODES = {
4141
REQUEST_ID_CONFLICT: 52417,
4242
REQUEST_NOT_FOUND: 52418,
4343
INFERENCE_CANCELLED: 52419,
44+
REQUEST_REJECTED_BY_POLICY: 52420,
4445

4546
// RAG Operations (52,800-52,999)
4647
RAG_SAVE_FAILED: 52800,
@@ -309,6 +310,16 @@ const serverErrorDefinitions: ErrorCodesMap = {
309310
message: (requestId: string) =>
310311
`Inference request "${requestId}" was cancelled before it could complete`,
311312
},
313+
[SDK_SERVER_ERROR_CODES.REQUEST_REJECTED_BY_POLICY]: {
314+
name: "REQUEST_REJECTED_BY_POLICY",
315+
message: (
316+
requestId: string,
317+
kind: string,
318+
modelId: string,
319+
reason: string,
320+
) =>
321+
`Request "${requestId}" (kind: ${kind}, modelId: ${modelId}) was rejected by registry concurrency policy: ${reason}`,
322+
},
312323

313324
// RAG Operations (52,800-52,999)
314325
[SDK_SERVER_ERROR_CODES.RAG_SAVE_FAILED]: {

packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { parseToolCalls } from "@/server/utils/tools";
4242
import { getResponseFormatJsonSchema } from "@/server/utils/response-format";
4343
import { buildAutoCacheSaveHistory, type CacheMessage } from "@/server/utils";
4444
import { getServerLogger } from "@/logging";
45+
import type { Logger } from "@/logging/types";
4546
import { AttachmentNotFoundError } from "@/utils/errors-server";
4647
import { nowMs } from "@/profiling";
4748
import {
@@ -415,11 +416,21 @@ export async function* completion(
415416
toolDialect?: ToolDialect;
416417
responseFormat?: ResponseFormat;
417418
},
418-
opts: { signal: AbortSignal; scope: DisposableScope },
419+
opts: {
420+
signal: AbortSignal;
421+
scope: DisposableScope;
422+
/**
423+
* Request-scoped logger forwarded to `createKvCacheSession` so
424+
* kv-cache lines share the request's lifecycle prefix. Falls
425+
* back to the module-level server logger when omitted.
426+
*/
427+
logger?: Logger;
428+
},
419429
): AsyncGenerator<{ token: string }, CompletionResult, unknown> {
420430
const { history, modelId, kvCache, tools, generationParams, responseFormat } =
421431
params;
422432
const { signal, scope } = opts;
433+
const requestLogger = opts.logger ?? logger;
423434

424435
const modelConfig = getModelConfig(modelId);
425436
const toolsEnabled = (modelConfig as { tools?: boolean }).tools === true;
@@ -470,7 +481,7 @@ export async function* completion(
470481
const addon = model.addon;
471482
if (addon?.cancel) {
472483
addon.cancel.call(addon).catch((err: unknown) => {
473-
logger.warn(
484+
requestLogger.warn(
474485
`[cancel] addon.cancel() rejected during abort for modelId=${modelId}: ${err instanceof Error ? err.message : String(err)}`,
475486
);
476487
});
@@ -523,7 +534,7 @@ export async function* completion(
523534
// rollback. Cancellations / zero-token replies / rename failures all
524535
// unwind through the same `scope.defer` hook. ----
525536

526-
const session = createKvCacheSession(modelId);
537+
const session = createKvCacheSession(modelId, { logger: requestLogger });
527538
const systemPromptFromHistory = extractSystemPrompt(history);
528539
// Dynamic mode lets each turn carry its own tool set, so the cache
529540
// hash must not depend on the tool list — otherwise a tool change

packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ import {
1414
logCacheStatus,
1515
} from "@/server/bare/plugins/llamacpp-completion/ops/cache-logger";
1616
import { getServerLogger } from "@/logging";
17+
import type { Logger } from "@/logging/types";
1718

18-
const logger = getServerLogger();
19+
// Used by cross-model paths that have no `RequestContext` (e.g.
20+
// `deleteKvCacheState`). Per-session call sites receive a logger from
21+
// the caller — typically `withRequestContext(...)`.
22+
const moduleLogger = getServerLogger();
1923

2024
/**
2125
* Single owner of the three KV-cache bookkeeping layers.
@@ -204,7 +208,17 @@ interface InternalTurnState {
204208

205209
// ----- factory -----
206210

207-
export function createKvCacheSession(modelId: string): KvCacheSession {
211+
/**
212+
* Construct a session bound to one `(modelId, turn-owning request)`
213+
* scope. `options.logger` is the per-instance logger the session emits
214+
* through (typically `withRequestContext(getServerLogger(), ctx)`);
215+
* falls back to the module-scoped logger when omitted.
216+
*/
217+
export function createKvCacheSession(
218+
modelId: string,
219+
options?: { logger?: Logger },
220+
): KvCacheSession {
221+
const logger = options?.logger ?? moduleLogger;
208222
// Per-session map: each `TurnHandle` carries an opaque entry here. A
209223
// WeakMap so handles drop their state once the handler scope releases
210224
// the reference; the module-scoped maps above survive.
@@ -254,7 +268,7 @@ export function createKvCacheSession(modelId: string): KvCacheSession {
254268

255269
if (!exists) {
256270
await input.primeIfMissing(cachePath);
257-
await verifyPrimedFile(cachePath);
271+
await verifyPrimedFile(cachePath, logger);
258272
initializedCaches.add(registryKey);
259273
}
260274

@@ -297,7 +311,7 @@ export function createKvCacheSession(modelId: string): KvCacheSession {
297311

298312
if (!cacheExists) {
299313
await input.primeIfMissing(cachePath);
300-
await verifyPrimedFile(cachePath);
314+
await verifyPrimedFile(cachePath, logger);
301315
initializedCaches.add(registryKey);
302316
}
303317

@@ -453,9 +467,9 @@ export async function deleteKvCacheState(
453467
const removed = await deleteCacheUtil({ all: true });
454468
cachedMessageCounts.clear();
455469
initializedCaches.clear();
456-
// `removed` is the kv-cache root dir; logging surfaces it for
457-
// ops visibility but isn't part of the contract.
458-
logger.debug(`[kv-cache] Cleared all caches under ${removed}`);
470+
// `removed` is the kv-cache root dir; surfaces it for ops
471+
// visibility but isn't part of the contract.
472+
moduleLogger.debug(`[kv-cache] Cleared all caches under ${removed}`);
459473
return;
460474
}
461475

@@ -512,7 +526,10 @@ export async function deleteKvCacheState(
512526
* `completion-stream.ts` lets the error propagate up and no
513527
* `initializedCaches` entry is recorded.
514528
*/
515-
async function verifyPrimedFile(cachePath: string): Promise<void> {
529+
async function verifyPrimedFile(
530+
cachePath: string,
531+
logger: Logger,
532+
): Promise<void> {
516533
let stats: { size: number };
517534
try {
518535
stats = await fsPromises.stat(cachePath);

packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ import { attachModelExecutionMs } from "@/profiling/model-execution";
3030
import { getModelConfig } from "@/server/bare/registry/model-registry";
3131
import { createCompletionNormalizer } from "@/server/utils/completion-normalizer";
3232
import { detectToolDialect } from "@/server/utils/tool-integration";
33-
import { getRequestRegistry } from "@/server/bare/runtime";
33+
import {
34+
getRequestRegistry,
35+
withRequestContext,
36+
} from "@/server/bare/runtime";
3437
import { generateServerRequestId } from "@/server/bare/runtime/request-id";
38+
import { getServerLogger } from "@/logging";
3539

3640

3741
function createLlmModel(
@@ -96,6 +100,7 @@ export const llmPlugin = definePlugin({
96100
requestSchema: completionStreamRequestSchema,
97101
responseSchema: completionStreamResponseSchema,
98102
streaming: true,
103+
cancel: { scope: "model", hard: true },
99104

100105
handler: async function* (request) {
101106
const filteredHistory = request.history.map(
@@ -141,6 +146,8 @@ export const llmPlugin = definePlugin({
141146
modelId: request.modelId,
142147
});
143148

149+
const requestLogger = withRequestContext(getServerLogger(), ctx);
150+
144151
const stream = completion(
145152
{
146153
history: filteredHistory,
@@ -151,7 +158,7 @@ export const llmPlugin = definePlugin({
151158
...(toolsActive && { toolDialect: dialect }),
152159
...(request.responseFormat && { responseFormat: request.responseFormat }),
153160
},
154-
{ signal: ctx.signal, scope: ctx.scope },
161+
{ signal: ctx.signal, scope: ctx.scope, logger: requestLogger },
155162
);
156163

157164
try {
@@ -210,6 +217,7 @@ export const llmPlugin = definePlugin({
210217
requestSchema: finetuneRequestSchema,
211218
responseSchema: finetuneResponseSchema,
212219
streaming: false,
220+
cancel: { scope: "none" },
213221

214222
handler: function (request) {
215223
return finetune(request);
@@ -220,6 +228,7 @@ export const llmPlugin = definePlugin({
220228
requestSchema: translateRequestSchema,
221229
responseSchema: translateResponseSchema,
222230
streaming: true,
231+
cancel: { scope: "model", hard: true },
223232

224233
handler: async function* (request) {
225234
const stream = translate(request);

packages/sdk/server/bare/plugins/llamacpp-embedding/plugin.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ export const embeddingsPlugin = definePlugin({
110110
requestSchema: embedRequestSchema,
111111
responseSchema: embedResponseSchema,
112112
streaming: false,
113+
// Model-wide hard cancel via `addon.cancel()` on the llama.cpp
114+
// embedding addon. Compute is interrupted when fired.
115+
cancel: { scope: "model", hard: true },
113116

114117
handler: async function (request) {
115118
const embedResult = await embed({

0 commit comments

Comments
 (0)