diff --git a/deno.json b/deno.json index 5af6312..c533f63 100644 --- a/deno.json +++ b/deno.json @@ -89,6 +89,7 @@ "@probitas/reporter": "jsr:@probitas/reporter@^0.7.0", "@probitas/runner": "jsr:@probitas/runner@^0.5.0", "@std/assert": "jsr:@std/assert@^1.0.16", + "@std/cbor": "jsr:@std/cbor@^0.1.9", "@std/cli": "jsr:@std/cli@^1.0.25", "@std/collections": "jsr:@std/collections@^1.1.3", "@std/dotenv": "jsr:@std/dotenv@^0.225.6", diff --git a/deno.lock b/deno.lock index 81a844f..82fc4b1 100644 --- a/deno.lock +++ b/deno.lock @@ -59,6 +59,8 @@ "jsr:@std/async@^1.0.15": "1.0.16", "jsr:@std/bytes@1": "1.0.6", "jsr:@std/bytes@^1.0.6": "1.0.6", + "jsr:@std/cbor@*": "0.1.9", + "jsr:@std/cbor@~0.1.9": "0.1.9", "jsr:@std/cli@^1.0.25": "1.0.25", "jsr:@std/collections@1": "1.1.3", "jsr:@std/collections@^1.1.3": "1.1.3", @@ -76,6 +78,7 @@ "jsr:@std/path@0.217": "0.217.0", "jsr:@std/path@1": "1.1.4", "jsr:@std/path@^1.1.4": "1.1.4", + "jsr:@std/streams@^1.0.14": "1.0.16", "jsr:@std/streams@^1.0.16": "1.0.16", "jsr:@std/streams@^1.0.9": "1.0.16", "jsr:@std/testing@^1.0.16": "1.0.16", @@ -370,6 +373,13 @@ "@std/bytes@1.0.6": { "integrity": "f6ac6adbd8ccd99314045f5703e23af0a68d7f7e58364b47d2c7f408aeb5820a" }, + "@std/cbor@0.1.9": { + "integrity": "fe1f61f445a34c8f97973b58fecbfb24e48fcc88df7b1253d9b6fe5d2ea16936", + "dependencies": [ + "jsr:@std/bytes@^1.0.6", + "jsr:@std/streams@^1.0.14" + ] + }, "@std/cli@1.0.25": { "integrity": "1f85051b370c97a7a9dfc6ba626e7ed57a91bea8c081597276d1e78d929d8c91", "dependencies": [ @@ -1553,6 +1563,7 @@ "jsr:@probitas/reporter@0.7", "jsr:@probitas/runner@0.5", "jsr:@std/assert@^1.0.16", + "jsr:@std/cbor@~0.1.9", "jsr:@std/cli@^1.0.25", "jsr:@std/collections@^1.1.3", "jsr:@std/dotenv@~0.225.6", diff --git a/probitas/80-bigint.probitas.ts b/probitas/80-bigint.probitas.ts new file mode 100644 index 0000000..1657174 --- /dev/null +++ b/probitas/80-bigint.probitas.ts @@ -0,0 +1,20 @@ +/** + * BigInt Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("BigInt Serialization") + .step("Return BigInt value", () => { + // CBOR natively supports BigInt + return { + value: BigInt(9007199254740991), + timestamp: BigInt(Date.now()), + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the BigInt values + return { + received: typeof ctx.previous.value === "bigint", + }; + }) + .build(); diff --git a/probitas/81-function.probitas.ts b/probitas/81-function.probitas.ts new file mode 100644 index 0000000..ca7f852 --- /dev/null +++ b/probitas/81-function.probitas.ts @@ -0,0 +1,25 @@ +/** + * Function Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Function Serialization") + .step("Return Function value", () => { + // Functions are serialized as placeholder via CBOR tagged value + return { + callback: () => "hello", + method: function namedFn() { + return 42; + }, + arrow: (x: number) => x * 2, + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the Function values + return { + hasCallback: typeof ctx.previous.callback === "function", + hasMethod: typeof ctx.previous.method === "function", + hasArrow: typeof ctx.previous.arrow === "function", + }; + }) + .build(); diff --git a/probitas/82-symbol.probitas.ts b/probitas/82-symbol.probitas.ts new file mode 100644 index 0000000..91e6a4c --- /dev/null +++ b/probitas/82-symbol.probitas.ts @@ -0,0 +1,23 @@ +/** + * Symbol Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Symbol Serialization") + .step("Return Symbol value", () => { + // Symbols are serialized via CBOR tagged value + return { + id: Symbol("unique-id"), + tag: Symbol.for("global-tag"), + iterator: Symbol.iterator, + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the Symbol values + return { + hasId: typeof ctx.previous.id === "symbol", + hasTag: typeof ctx.previous.tag === "symbol", + hasIterator: typeof ctx.previous.iterator === "symbol", + }; + }) + .build(); diff --git a/probitas/83-circular.probitas.ts b/probitas/83-circular.probitas.ts new file mode 100644 index 0000000..e2d5f1c --- /dev/null +++ b/probitas/83-circular.probitas.ts @@ -0,0 +1,20 @@ +/** + * Circular Reference Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Circular Reference Serialization") + .step("Return circular reference", () => { + // Circular references are handled via CBOR tagged value + const obj: Record = { name: "parent" }; + obj.self = obj; + return obj; + }) + .step("This step should also run", (ctx) => { + // This step accesses the circular reference + return { + hasName: ctx.previous.name === "parent", + hasSelf: ctx.previous.self === ctx.previous, + }; + }) + .build(); diff --git a/probitas/84-undefined.probitas.ts b/probitas/84-undefined.probitas.ts new file mode 100644 index 0000000..2c3bb14 --- /dev/null +++ b/probitas/84-undefined.probitas.ts @@ -0,0 +1,28 @@ +/** + * Undefined Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Undefined Serialization") + .step("Return undefined values", () => { + // CBOR natively supports undefined + return { + explicit: undefined, + nested: { + value: undefined, + defined: "exists", + }, + array: [1, undefined, 3], + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the undefined values + return { + hasExplicit: "explicit" in ctx.previous, + explicitIsUndefined: ctx.previous.explicit === undefined, + nestedValueIsUndefined: ctx.previous.nested.value === undefined, + nestedDefinedExists: ctx.previous.nested.defined === "exists", + arraySecondIsUndefined: ctx.previous.array[1] === undefined, + }; + }) + .build(); diff --git a/probitas/85-map-set.probitas.ts b/probitas/85-map-set.probitas.ts new file mode 100644 index 0000000..f769d9f --- /dev/null +++ b/probitas/85-map-set.probitas.ts @@ -0,0 +1,30 @@ +/** + * Map/Set Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Map/Set Serialization") + .step("Return Map and Set values", () => { + // Map and Set are serialized via CBOR tagged value + const map = new Map([ + ["a", 1], + ["b", 2], + ]); + const set = new Set([1, 2, 3]); + return { + map, + set, + weakMap: new WeakMap(), + weakSet: new WeakSet(), + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the Map and Set values + return { + mapIsMap: ctx.previous.map instanceof Map, + mapSize: ctx.previous.map.size, + setIsSet: ctx.previous.set instanceof Set, + setSize: ctx.previous.set.size, + }; + }) + .build(); diff --git a/probitas/86-regexp.probitas.ts b/probitas/86-regexp.probitas.ts new file mode 100644 index 0000000..fb3d948 --- /dev/null +++ b/probitas/86-regexp.probitas.ts @@ -0,0 +1,24 @@ +/** + * RegExp Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("RegExp Serialization") + .step("Return RegExp values", () => { + // RegExp is serialized via CBOR tagged value + return { + pattern: /hello\s+world/gi, + email: new RegExp("^[a-z]+@[a-z]+\\.[a-z]+$", "i"), + unicode: /\p{Script=Hiragana}+/u, + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the RegExp values + return { + patternIsRegExp: ctx.previous.pattern instanceof RegExp, + patternSource: ctx.previous.pattern.source, + emailIsRegExp: ctx.previous.email instanceof RegExp, + unicodeFlags: ctx.previous.unicode.flags, + }; + }) + .build(); diff --git a/probitas/87-error.probitas.ts b/probitas/87-error.probitas.ts new file mode 100644 index 0000000..904f922 --- /dev/null +++ b/probitas/87-error.probitas.ts @@ -0,0 +1,31 @@ +/** + * Error Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Error Serialization") + .step("Return Error values", () => { + // Error is serialized via CBOR tagged value (preserves name, message, stack, custom properties) + const error = new Error("Something went wrong"); + const typeError = new TypeError("Invalid type"); + const customError = Object.assign(new Error("Custom"), { + code: "ERR_CUSTOM", + details: { foo: "bar" }, + }); + return { + error, + typeError, + customError, + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the Error values + return { + errorIsError: ctx.previous.error instanceof Error, + errorMessage: ctx.previous.error.message, + errorHasStack: typeof ctx.previous.error.stack === "string", + typeErrorName: ctx.previous.typeError.name, + customErrorCode: ctx.previous.customError.code, + }; + }) + .build(); diff --git a/probitas/88-date.probitas.ts b/probitas/88-date.probitas.ts new file mode 100644 index 0000000..f4d6aaa --- /dev/null +++ b/probitas/88-date.probitas.ts @@ -0,0 +1,27 @@ +/** + * Date Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("Date Serialization") + .step("Return Date values", () => { + // CBOR natively supports Date + return { + now: new Date(), + epoch: new Date(0), + specific: new Date("2024-01-15T12:00:00Z"), + invalid: new Date("invalid"), + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the Date values + return { + nowIsDate: ctx.previous.now instanceof Date, + nowTime: ctx.previous.now.getTime(), + epochIsDate: ctx.previous.epoch instanceof Date, + epochTime: ctx.previous.epoch.getTime(), + specificYear: ctx.previous.specific.getUTCFullYear(), + invalidIsNaN: Number.isNaN(ctx.previous.invalid.getTime()), + }; + }) + .build(); diff --git a/probitas/89-typed-array.probitas.ts b/probitas/89-typed-array.probitas.ts new file mode 100644 index 0000000..95aa922 --- /dev/null +++ b/probitas/89-typed-array.probitas.ts @@ -0,0 +1,29 @@ +/** + * TypedArray Serialization Test Scenario + */ +import { scenario } from "jsr:@probitas/probitas@^0"; + +export default scenario("TypedArray Serialization") + .step("Return TypedArray values", () => { + // TypedArrays are serialized via CBOR tagged value + return { + uint8: new Uint8Array([1, 2, 3, 4]), + int32: new Int32Array([100, 200, -300]), + float64: new Float64Array([1.5, 2.5, 3.5]), + buffer: new ArrayBuffer(8), + dataView: new DataView(new ArrayBuffer(4)), + }; + }) + .step("This step should also run", (ctx) => { + // This step accesses the TypedArray values + return { + uint8IsTypedArray: ctx.previous.uint8 instanceof Uint8Array, + uint8Length: ctx.previous.uint8.length, + uint8FirstValue: ctx.previous.uint8[0], + int32IsTypedArray: ctx.previous.int32 instanceof Int32Array, + float64Sum: ctx.previous.float64[0] + ctx.previous.float64[1], + bufferIsArrayBuffer: ctx.previous.buffer instanceof ArrayBuffer, + bufferByteLength: ctx.previous.buffer.byteLength, + }; + }) + .build(); diff --git a/src/cli/_templates/list.ts b/src/cli/_templates/list.ts index 476a694..dd39435 100644 --- a/src/cli/_templates/list.ts +++ b/src/cli/_templates/list.ts @@ -2,7 +2,7 @@ * Subprocess entry point for list command * * Loads scenarios and outputs metadata via TCP IPC. - * Communication is via NDJSON over TCP (not stdin/stdout). + * Communication is via CBOR over TCP (not stdin/stdout). * * @module * @internal @@ -10,80 +10,37 @@ import { loadScenarios } from "@probitas/core/loader"; import { applySelectors } from "@probitas/core/selector"; -import { toErrorObject } from "@core/errorutil/error-object"; import type { ListInput, ListOutput, ScenarioMeta } from "./list_protocol.ts"; -import { - closeIpc, - connectIpc, - parseIpcPort, - readInput, - writeOutput, -} from "./utils.ts"; - -/** - * Main entry point - */ -async function main(): Promise { - // Parse IPC port from command line arguments - const port = parseIpcPort(Deno.args); - - // Connect to parent process IPC server - const ipc = await connectIpc(port); - - try { - // Read input from IPC (TCP connection establishes readiness) - const input = await readInput(ipc) as ListInput; - - // Load scenarios from files - const scenarios = await loadScenarios(input.filePaths, { - onImportError: (file, err) => { - const m = err instanceof Error ? err.message : String(err); - throw new Error(`Failed to load scenario from ${file}: ${m}`); - }, - }); - - // Apply selectors to filter scenarios - const filtered = input.selectors.length > 0 - ? applySelectors(scenarios, input.selectors) - : scenarios; - - // Build output metadata - const scenarioMetas: ScenarioMeta[] = filtered.map((s) => ({ - name: s.name, - tags: s.tags, - steps: s.steps.filter((e) => e.kind === "step").length, - file: s.origin?.path || "unknown", - })); - - await writeOutput( - ipc, - { - type: "result", - scenarios: scenarioMetas, - } satisfies ListOutput, - ); - } catch (error) { - const err = error instanceof Error ? error : new Error(String(error)); - try { - await writeOutput( - ipc, - { - type: "error", - error: toErrorObject(err), - } satisfies ListOutput, - ); - } catch { - // Failed to write error to IPC, log to console as fallback - console.error("Subprocess error:", error); - } - } finally { - // Await close to ensure all pending writes are flushed - await closeIpc(ipc); - } -} - -// Run main and exit explicitly to avoid async operations keeping process alive -main().finally(() => { - // Ensure process exits after output is flushed - setTimeout(() => Deno.exit(0), 0); +import { runSubprocess, writeOutput } from "./utils.ts"; + +// Run subprocess with bootstrap handling +runSubprocess(async (ipc, input) => { + // Load scenarios from files + const scenarios = await loadScenarios(input.filePaths, { + onImportError: (file, err) => { + const m = err instanceof Error ? err.message : String(err); + throw new Error(`Failed to load scenario from ${file}: ${m}`); + }, + }); + + // Apply selectors to filter scenarios + const filtered = input.selectors.length > 0 + ? applySelectors(scenarios, input.selectors) + : scenarios; + + // Build output metadata + const scenarioMetas: ScenarioMeta[] = filtered.map((s) => ({ + name: s.name, + tags: s.tags, + steps: s.steps.filter((e) => e.kind === "step").length, + file: s.origin?.path || "unknown", + })); + + await writeOutput( + ipc, + { + type: "result", + scenarios: scenarioMetas, + } satisfies ListOutput, + ); }); diff --git a/src/cli/_templates/list_protocol.ts b/src/cli/_templates/list_protocol.ts index ca37a35..f4c1088 100644 --- a/src/cli/_templates/list_protocol.ts +++ b/src/cli/_templates/list_protocol.ts @@ -5,6 +5,7 @@ */ import type { ErrorObject } from "@core/errorutil/error-object"; +import { createOutputValidator } from "./utils.ts"; /** * Input sent to subprocess via IPC @@ -51,20 +52,10 @@ export interface ScenarioMeta { readonly file: string; } -/** - * Valid ListOutput type values - */ -const LIST_OUTPUT_TYPES = new Set(["result", "error"]); - /** * Type guard to check if a value is a valid ListOutput */ -export function isListOutput(value: unknown): value is ListOutput { - return ( - value !== null && - typeof value === "object" && - "type" in value && - typeof value.type === "string" && - LIST_OUTPUT_TYPES.has(value.type) - ); -} +export const isListOutput = createOutputValidator([ + "result", + "error", +]); diff --git a/src/cli/_templates/run.ts b/src/cli/_templates/run.ts index 3d97151..fd8b7cd 100644 --- a/src/cli/_templates/run.ts +++ b/src/cli/_templates/run.ts @@ -2,7 +2,7 @@ * Subprocess entry point for run command * * Executes scenarios and streams progress via TCP IPC. - * Communication is via NDJSON over TCP (not stdin/stdout). + * Communication is via CBOR over TCP (not stdin/stdout). * * @module * @internal @@ -19,15 +19,7 @@ import { serializeError, serializeRunResult, } from "./run_protocol.ts"; -import { - closeIpc, - configureLogging, - connectIpc, - type IpcConnection, - parseIpcPort, - readInput, - writeOutput, -} from "./utils.ts"; +import { configureLogging, runSubprocess, writeOutput } from "./utils.ts"; const logger = getLogger(["probitas", "cli", "run", "subprocess"]); @@ -60,11 +52,12 @@ globalThis.addEventListener( /** * Execute all scenarios + * + * This handler manages its own error handling because it needs to: + * 1. Clean up the abort controller on error + * 2. Write structured error output for scenario execution failures */ -async function runScenarios( - ipc: IpcConnection, - input: RunCommandInput, -): Promise { +runSubprocess(async (ipc, input) => { const { filePaths, selectors, @@ -142,44 +135,4 @@ async function runScenarios( } finally { globalAbortController = null; } -} - -/** - * Main entry point - */ -async function main(): Promise { - // Parse IPC port from command line arguments - const port = parseIpcPort(Deno.args); - - // Connect to parent process IPC server - const ipc = await connectIpc(port); - - try { - // Read input from IPC (TCP connection establishes readiness) - const input = await readInput(ipc) as RunCommandInput; - - await runScenarios(ipc, input); - } catch (error) { - try { - await writeOutput( - ipc, - { - type: "error", - error: serializeError(error), - } satisfies RunOutput, - ); - } catch { - // Failed to write error to IPC, log to console as fallback - console.error("Subprocess error:", error); - } - } finally { - // Await close to ensure all pending writes are flushed - await closeIpc(ipc); - } -} - -// Run main and exit explicitly to avoid LogTape keeping process alive -main().finally(() => { - // Ensure process exits after output is flushed - setTimeout(() => Deno.exit(0), 0); }); diff --git a/src/cli/_templates/run_protocol.ts b/src/cli/_templates/run_protocol.ts index e27d8a1..6248ca9 100644 --- a/src/cli/_templates/run_protocol.ts +++ b/src/cli/_templates/run_protocol.ts @@ -12,12 +12,16 @@ import type { } from "@probitas/runner"; import type { ScenarioMetadata, StepMetadata } from "@probitas/core"; import type { LogLevel } from "@logtape/logtape"; +import { isErrorObject } from "@core/errorutil/error-object"; import { + createOutputValidator, + deserializeError, type ErrorObject, - fromErrorObject, - isErrorObject, - toErrorObject, -} from "@core/errorutil/error-object"; + serializeError, +} from "./utils.ts"; + +// Re-export for backward compatibility with consumers +export { deserializeError, type ErrorObject, serializeError } from "./utils.ts"; /** * Message sent from CLI to subprocess @@ -148,25 +152,15 @@ export interface RunStepEndOutput { } /** - * Serialize an error for transmission - */ -export function serializeError(error: unknown): ErrorObject { - const err = error instanceof Error ? error : new Error(String(error)); - return toErrorObject(err); -} - -/** - * Deserialize an error from transmission - */ -export function deserializeError(serialized: ErrorObject): Error { - return fromErrorObject(serialized); -} - -/** - * Serialize StepResult error for transmission + * Serialize StepResult for transmission + * + * Handles error serialization (for failed/skipped) to ensure proper + * cross-process Error transmission. Passed step values are transmitted + * as-is since CBOR streaming handles all complex types. */ export function serializeStepResult(result: StepResult): StepResult { if (result.status === "passed") { + // Value is transmitted directly - CBOR streaming handles all types return result; } return { @@ -193,10 +187,15 @@ export function serializeScenarioResult( } /** - * Deserialize StepResult error from transmission + * Deserialize StepResult from transmission + * + * Handles error deserialization (for failed/skipped) to restore + * Error instances from serialized ErrorObject. Passed step values + * are already deserialized by CBOR streaming. */ export function deserializeStepResult(result: StepResult): StepResult { if (result.status === "passed") { + // Value is already deserialized by CBOR streaming return result; } if (isErrorObject(result.error)) { @@ -313,9 +312,9 @@ export function createReporter( } /** - * Valid RunOutput type values + * Type guard to check if a value is a valid RunOutput */ -const RUN_OUTPUT_TYPES = new Set([ +export const isRunOutput = createOutputValidator([ "result", "error", "run_start", @@ -325,16 +324,3 @@ const RUN_OUTPUT_TYPES = new Set([ "step_start", "step_end", ]); - -/** - * Type guard to check if a value is a valid RunOutput - */ -export function isRunOutput(value: unknown): value is RunOutput { - return ( - value !== null && - typeof value === "object" && - "type" in value && - typeof value.type === "string" && - RUN_OUTPUT_TYPES.has(value.type) - ); -} diff --git a/src/cli/_templates/serializer.ts b/src/cli/_templates/serializer.ts new file mode 100644 index 0000000..25634e1 --- /dev/null +++ b/src/cli/_templates/serializer.ts @@ -0,0 +1,503 @@ +/** + * CBOR-based serializer for IPC communication + * + * Uses CBOR (Concise Binary Object Representation) for serialization, + * with tagged values for custom types that CBOR doesn't natively support. + * + * CBOR natively supports: + * - null, undefined, boolean, number, bigint, string + * - Uint8Array (as byte string) + * - Date (via tag 0/1, automatically handled) + * - Arrays + * - Plain objects (as CBOR maps with string keys) + * + * Custom types are encoded using tagged values (tag 256+): + * - Symbol, Function, RegExp, Set, Map, Error + * - Circular references + * - Other TypedArrays, ArrayBuffer, DataView + * - WeakMap, WeakSet, WeakRef (markers only) + * + * @module + * @internal + */ + +import { + CborArrayDecodedStream, + CborByteDecodedStream, + CborMapDecodedStream, + type CborPrimitiveType, + type CborStreamInput, + type CborStreamOutput, + CborTag, + CborTextDecodedStream, + type CborType, +} from "@std/cbor"; + +/** + * Custom tag numbers for types not natively supported by CBOR + * Using private range (256-65535) to avoid conflicts with standard tags + */ +const Tag = { + /** Symbol type */ + Symbol: 256, + /** Symbol.for (global registry) */ + SymbolFor: 257, + /** Function placeholder */ + Function: 258, + // Note: 259 is reserved by @std/cbor (expects a map), skip it + /** Set collection */ + Set: 260, + /** Error object */ + Error: 261, + /** Circular reference */ + Circular: 262, + /** Int8Array */ + Int8Array: 263, + /** Uint16Array */ + Uint16Array: 264, + /** Int16Array */ + Int16Array: 265, + /** Uint32Array */ + Uint32Array: 266, + /** Int32Array */ + Int32Array: 267, + /** Float32Array */ + Float32Array: 268, + /** Float64Array */ + Float64Array: 269, + /** BigInt64Array */ + BigInt64Array: 270, + /** BigUint64Array */ + BigUint64Array: 271, + /** Uint8ClampedArray */ + Uint8ClampedArray: 272, + /** ArrayBuffer */ + ArrayBuffer: 273, + /** DataView */ + DataView: 274, + /** WeakMap (marker only) */ + WeakMap: 275, + /** WeakSet (marker only) */ + WeakSet: 276, + /** WeakRef (marker only) */ + WeakRef: 277, + /** RegExp object */ + RegExp: 278, + /** Map collection (as array of entries to support any key type) */ + Map: 279, +} as const; + +/** + * Get the tag number for a TypedArray + */ +function getTypedArrayTag(arr: ArrayBufferView): number | null { + // Note: Uint8Array is handled natively by CBOR + if (arr instanceof Int8Array) return Tag.Int8Array; + if (arr instanceof Uint16Array) return Tag.Uint16Array; + if (arr instanceof Int16Array) return Tag.Int16Array; + if (arr instanceof Uint32Array) return Tag.Uint32Array; + if (arr instanceof Int32Array) return Tag.Int32Array; + if (arr instanceof Float32Array) return Tag.Float32Array; + if (arr instanceof Float64Array) return Tag.Float64Array; + if (arr instanceof BigInt64Array) return Tag.BigInt64Array; + if (arr instanceof BigUint64Array) return Tag.BigUint64Array; + if (arr instanceof Uint8ClampedArray) return Tag.Uint8ClampedArray; + return null; +} + +/** + * Extract a clean ArrayBuffer from a Uint8Array view + * + * The Uint8Array's .buffer may be larger than the actual data due to + * CBOR decoder's internal buffer reuse. We must slice using byteOffset + * and byteLength to get only the relevant bytes. + */ +function extractBuffer(bytes: Uint8Array): ArrayBuffer { + const copy = new Uint8Array(bytes.byteLength); + copy.set(bytes); + return copy.buffer; +} + +/** + * Check if a value is a CBOR primitive type + * + * CborPrimitiveType includes: undefined, null, boolean, number, bigint, string, + * Uint8Array, and Date. These types are natively supported by CBOR. + */ +function isCborPrimitiveType(x: unknown): x is CborPrimitiveType { + if (x === null || x === undefined) return true; + const t = typeof x; + if (t === "boolean" || t === "number" || t === "bigint" || t === "string") { + return true; + } + if (x instanceof Uint8Array || x instanceof Date) return true; + return false; +} + +/** + * Convert a JavaScript value to CborStreamInput + * + * Handles JavaScript types that CBOR doesn't natively support by using + * tagged values. Supports circular references via path tracking. + * + * @param value - Value to convert + * @returns CborStreamInput representation for use with CborSequenceEncoderStream + */ +export function toCborStreamInput(value: unknown): CborStreamInput { + const seen = new Map(); + + function convert(val: unknown, path: string): CborStreamInput { + // Handle CBOR primitives first (null, undefined, boolean, number, bigint, string, Uint8Array, Date) + if (isCborPrimitiveType(val)) { + return val; + } + + // Handle function and symbol (need tagging) + if (typeof val === "function") { + return new CborTag(Tag.Function, val.name || ""); + } + if (typeof val === "symbol") { + const key = Symbol.keyFor(val); + if (key !== undefined) { + return new CborTag(Tag.SymbolFor, key); + } + return new CborTag(Tag.Symbol, val.description ?? ""); + } + + // Check for circular reference + if (seen.has(val)) { + return new CborTag(Tag.Circular, seen.get(val)!); + } + + // Mark object as seen + seen.set(val, path); + + // Handle special object types + if (val instanceof RegExp) { + return new CborTag(Tag.RegExp, [val.source, val.flags]); + } + + if (val instanceof Set) { + const values: CborStreamInput[] = []; + let i = 0; + for (const v of val) { + values.push(convert(v, `${path}.values[${i}]`)); + i++; + } + return new CborTag(Tag.Set, values); + } + + if (val instanceof Map) { + // Use tagged array of entries to preserve Map identity on round-trip + // This supports any key type (unlike CBOR native maps which require string keys) + const entries: [CborStreamInput, CborStreamInput][] = []; + let i = 0; + for (const [k, v] of val) { + entries.push([ + convert(k, `${path}.entries[${i}][0]`), + convert(v, `${path}.entries[${i}][1]`), + ]); + i++; + } + return new CborTag(Tag.Map, entries); + } + + if (val instanceof WeakMap) { + return new CborTag(Tag.WeakMap, null); + } + + if (val instanceof WeakSet) { + return new CborTag(Tag.WeakSet, null); + } + + if (val instanceof WeakRef) { + return new CborTag(Tag.WeakRef, null); + } + + if (val instanceof Error) { + return new CborTag(Tag.Error, { + name: val.name, + message: val.message, + stack: val.stack, + // Preserve custom properties, recursively converted + ...Object.fromEntries( + Object.entries(val) + .filter(([k]) => !["name", "message", "stack"].includes(k)) + .map(([k, v]) => [k, convert(v, `${path}.${k}`)]), + ), + }); + } + + if (val instanceof DataView) { + return new CborTag( + Tag.DataView, + new Uint8Array(val.buffer, val.byteOffset, val.byteLength), + ); + } + + if (val instanceof ArrayBuffer) { + return new CborTag(Tag.ArrayBuffer, new Uint8Array(val)); + } + + // Handle TypedArrays (except Uint8Array which is handled by isCborPrimitiveType) + if (ArrayBuffer.isView(val) && !(val instanceof DataView)) { + const tagNumber = getTypedArrayTag(val); + if (tagNumber !== null) { + // Convert to Uint8Array for binary transmission + return new CborTag( + tagNumber, + new Uint8Array(val.buffer, val.byteOffset, val.byteLength), + ); + } + } + + // Handle arrays + if (Array.isArray(val)) { + return val.map((item, i) => convert(item, `${path}[${i}]`)); + } + + // Handle plain objects (encoded as CBOR map with string keys) + const result: Record = {}; + for (const key of Object.keys(val)) { + result[key] = convert( + (val as Record)[key], + `${path}.${key}`, + ); + } + return result; + } + + return convert(value, "$"); +} + +/** + * Convert CborType to a JavaScript value + * + * Restores special values that were serialized using tagged values. + * + * @param cborType - CborType to convert + * @returns Original value (or close approximation) + * @internal + */ +function fromCborType(cborType: CborType): unknown { + const refs = new Map(); + + function convert(val: CborType, path: string): unknown { + // Handle CBOR primitives (null, undefined, boolean, number, bigint, string, Uint8Array, Date) + if (isCborPrimitiveType(val)) { + return val; + } + + // Handle CborTag + if (val instanceof CborTag) { + const tag = val.tagNumber; + const content = val.tagContent as CborType; + + switch (tag) { + case Tag.Symbol: + return Symbol(content as string); + case Tag.SymbolFor: + return Symbol.for(content as string); + case Tag.Function: { + const rawName = typeof content === "string" ? content : ""; + // Validate function name to prevent code injection + const safeName = /^[A-Za-z_$][A-Za-z0-9_$]*$/.test(rawName) + ? rawName + : "anonymous"; + const fn = function () {}; + Object.defineProperty(fn, "name", { + value: safeName, + configurable: true, + }); + return fn; + } + case Tag.RegExp: { + const [source, flags] = content as [string, string]; + return new RegExp(source, flags); + } + case Tag.Set: { + const set = new Set(); + refs.set(path, set); + const values = content as CborType[]; + for (let i = 0; i < values.length; i++) { + set.add(convert(values[i], `${path}.values[${i}]`)); + } + return set; + } + case Tag.Map: { + const map = new Map(); + refs.set(path, map); + const entries = content as [CborType, CborType][]; + for (let i = 0; i < entries.length; i++) { + const [k, v] = entries[i]; + map.set( + convert(k, `${path}.entries[${i}][0]`), + convert(v, `${path}.entries[${i}][1]`), + ); + } + return map; + } + case Tag.Error: { + const obj = content as Record; + const error = new Error(obj.message as string); + error.name = obj.name as string; + if (obj.stack) error.stack = obj.stack as string; + // Register error for potential circular references + refs.set(path, error); + // Restore custom properties, recursively converting their values + const customProps: Record = {}; + for (const [k, v] of Object.entries(obj)) { + if (!["name", "message", "stack"].includes(k)) { + customProps[k] = convert(v as CborType, `${path}.${k}`); + } + } + Object.assign(error, customProps); + return error; + } + case Tag.Circular: { + const refPath = content as string; + return refs.get(refPath); + } + case Tag.Int8Array: + return new Int8Array(extractBuffer(content as Uint8Array)); + case Tag.Uint16Array: + return new Uint16Array(extractBuffer(content as Uint8Array)); + case Tag.Int16Array: + return new Int16Array(extractBuffer(content as Uint8Array)); + case Tag.Uint32Array: + return new Uint32Array(extractBuffer(content as Uint8Array)); + case Tag.Int32Array: + return new Int32Array(extractBuffer(content as Uint8Array)); + case Tag.Float32Array: + return new Float32Array(extractBuffer(content as Uint8Array)); + case Tag.Float64Array: + return new Float64Array(extractBuffer(content as Uint8Array)); + case Tag.BigInt64Array: + return new BigInt64Array(extractBuffer(content as Uint8Array)); + case Tag.BigUint64Array: + return new BigUint64Array(extractBuffer(content as Uint8Array)); + case Tag.Uint8ClampedArray: + return new Uint8ClampedArray(extractBuffer(content as Uint8Array)); + case Tag.ArrayBuffer: + return extractBuffer(content as Uint8Array); + case Tag.DataView: + return new DataView(extractBuffer(content as Uint8Array)); + case Tag.WeakMap: + return new WeakMap(); + case Tag.WeakSet: + return new WeakSet(); + case Tag.WeakRef: + return { [Symbol.toStringTag]: "WeakRef (unrestorable)" }; + default: + // Unknown tag, return content as-is + return convert(content, path); + } + } + + // Handle arrays + if (Array.isArray(val)) { + const arr: unknown[] = []; + refs.set(path, arr); + for (let i = 0; i < val.length; i++) { + arr.push(convert(val[i], `${path}[${i}]`)); + } + return arr; + } + + // Handle plain objects (from CBOR maps) + const obj: Record = {}; + refs.set(path, obj); + for (const key of Object.keys(val)) { + obj[key] = convert( + (val as Record)[key], + `${path}.${key}`, + ); + } + return obj; + } + + return convert(cborType, "$"); +} + +/** + * Consume CborStreamOutput and convert to regular CborType + * + * CborSequenceDecoderStream returns streaming types for arrays and maps. + * This function consumes those streams and returns regular values. + */ +async function consumeStreamOutput(item: CborStreamOutput): Promise { + // Handle CBOR primitives first (null, undefined, boolean, number, bigint, string, Uint8Array, Date) + if (isCborPrimitiveType(item)) { + return item; + } + + if (item instanceof CborTag) { + const consumed = await consumeStreamOutput( + item.tagContent as CborStreamOutput, + ); + return new CborTag(item.tagNumber, consumed); + } + + if (item instanceof CborArrayDecodedStream) { + const arr: CborType[] = []; + for await (const elem of item) { + arr.push(await consumeStreamOutput(elem)); + } + return arr; + } + + if (item instanceof CborMapDecodedStream) { + // CBOR maps become plain objects (JS Maps use Tag.Map) + const obj: Record = {}; + for await (const [key, value] of item) { + obj[key] = await consumeStreamOutput(value); + } + return obj; + } + + if (item instanceof CborByteDecodedStream) { + const chunks: Uint8Array[] = []; + for await (const chunk of item) { + chunks.push(chunk); + } + const totalLength = chunks.reduce((sum, c) => sum + c.length, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.length; + } + return result; + } + + if (item instanceof CborTextDecodedStream) { + const chunks: string[] = []; + for await (const chunk of item) { + chunks.push(chunk); + } + return chunks.join(""); + } + + // Plain object - should not happen with streaming decoder, but handle just in case + console.debug( + "[serializer] Unexpected plain object in consumeStreamOutput:", + item, + ); + return item as CborType; +} + +/** + * Deserialize CborStreamOutput from CborSequenceDecoderStream + * + * Use this when decoding via CborSequenceDecoderStream for RFC 8742 compliance. + * Consumes streaming types and restores custom types from tagged values. + * + * @param item - CborStreamOutput from CborSequenceDecoderStream + * @returns Original value (or close approximation) + */ +export async function fromCborStreamOutput( + item: CborStreamOutput, +): Promise { + const cborType = await consumeStreamOutput(item); + return fromCborType(cborType); +} diff --git a/src/cli/_templates/serializer_test.ts b/src/cli/_templates/serializer_test.ts new file mode 100644 index 0000000..7436859 --- /dev/null +++ b/src/cli/_templates/serializer_test.ts @@ -0,0 +1,425 @@ +/** + * Tests for CBOR-based serializer (streaming API) + */ +import { assertEquals, assertInstanceOf } from "@std/assert"; +import { describe, it } from "@std/testing/bdd"; +import { + CborSequenceDecoderStream, + CborSequenceEncoderStream, +} from "@std/cbor"; +import { fromCborStreamOutput, toCborStreamInput } from "./serializer.ts"; + +/** + * Helper to round-trip a value through streaming encode/decode + */ +async function roundTrip(value: T): Promise { + // Convert to CborStreamInput + const cborInput = toCborStreamInput(value); + + // Create encoder and decoder streams + const encoder = new CborSequenceEncoderStream(); + const decoder = new CborSequenceDecoderStream(); + + // Pipe encoder output to decoder input + const encodedStream = encoder.readable.pipeThrough(decoder); + + // Write the input to the encoder + const writer = encoder.writable.getWriter(); + await writer.write(cborInput); + await writer.close(); + + // Read the decoded output + const reader = encodedStream.getReader(); + const result = await reader.read(); + if (result.done) { + throw new Error("No output from decoder"); + } + + // Convert CborStreamOutput back to JavaScript value + return await fromCborStreamOutput(result.value); +} + +describe("serializer (CBOR streaming)", () => { + describe("primitives", () => { + it("handles null", async () => { + assertEquals(await roundTrip(null), null); + }); + + it("handles undefined", async () => { + assertEquals(await roundTrip(undefined), undefined); + }); + + it("handles boolean", async () => { + assertEquals(await roundTrip(true), true); + assertEquals(await roundTrip(false), false); + }); + + it("handles number", async () => { + assertEquals(await roundTrip(42), 42); + assertEquals(await roundTrip(3.14), 3.14); + assertEquals(await roundTrip(-100), -100); + }); + + it("handles string", async () => { + assertEquals(await roundTrip("hello"), "hello"); + assertEquals(await roundTrip(""), ""); + assertEquals(await roundTrip("日本語"), "日本語"); + }); + }); + + describe("BigInt", () => { + it("converts small BigInt to number (CBOR behavior)", async () => { + // CBOR decodes small bigints as numbers + assertEquals(await roundTrip(BigInt(123)), 123); + }); + + it("round-trips large BigInt", async () => { + // CBOR preserves bigint for values that don't fit in number + const large = BigInt("12345678901234567890"); + assertEquals(await roundTrip(large), large); + }); + + it("converts small negative BigInt to number (CBOR behavior)", async () => { + // CBOR decodes small bigints as numbers + assertEquals(await roundTrip(BigInt(-999)), -999); + }); + }); + + describe("Function", () => { + it("serializes and deserializes named function", async () => { + function namedFn() {} + const result = await roundTrip(namedFn); + assertEquals(typeof result, "function"); + }); + + it("serializes arrow function", async () => { + const arrow = () => {}; + const result = await roundTrip(arrow); + assertEquals(typeof result, "function"); + }); + + it("deserializes as placeholder function", async () => { + function myFunc() {} + const result = (await roundTrip(myFunc)) as () => void; + assertEquals(typeof result, "function"); + // Placeholder function does nothing + assertEquals(result(), undefined); + }); + }); + + describe("Symbol", () => { + it("round-trips Symbol with description", async () => { + const result = await roundTrip(Symbol("test")); + assertEquals(typeof result, "symbol"); + assertEquals((result as symbol).description, "test"); + }); + + it("round-trips Symbol.for correctly", async () => { + const original = Symbol.for("shared"); + const result = await roundTrip(original); + assertEquals(result, Symbol.for("shared")); + }); + + it("creates new Symbol for local symbols", async () => { + const original = Symbol("local"); + const result = await roundTrip(original); + assertEquals(typeof result, "symbol"); + assertEquals((result as symbol).description, "local"); + // Local symbols are NOT equal (they're recreated) + }); + }); + + describe("Date", () => { + it("round-trips Date", async () => { + const original = new Date("2024-06-15T18:30:00Z"); + const result = await roundTrip(original); + assertInstanceOf(result, Date); + assertEquals((result as Date).getTime(), original.getTime()); + }); + + it("round-trips epoch Date", async () => { + const original = new Date(0); + const result = await roundTrip(original); + assertInstanceOf(result, Date); + assertEquals((result as Date).getTime(), 0); + }); + }); + + describe("circular references", () => { + it("handles self-referencing object", async () => { + const obj: Record = { name: "parent" }; + obj.self = obj; + + const result = (await roundTrip(obj)) as Record; + + assertEquals(result.name, "parent"); + assertEquals(result.self, result); + }); + + it("handles mutually referencing objects", async () => { + const a: Record = { name: "a" }; + const b: Record = { name: "b" }; + a.ref = b; + b.ref = a; + + const result = (await roundTrip(a)) as Record; + const resultB = result.ref as Record; + + assertEquals(result.name, "a"); + assertEquals(resultB.name, "b"); + assertEquals(resultB.ref, result); + }); + + it("handles circular array", async () => { + const arr: unknown[] = [1, 2]; + arr.push(arr); + + const result = (await roundTrip(arr)) as unknown[]; + assertEquals(result[0], 1); + assertEquals(result[1], 2); + assertEquals(result[2], result); + }); + }); + + describe("Map", () => { + it("round-trips Map with string keys", async () => { + const original = new Map([ + ["x", 10], + ["y", 20], + ]); + const result = await roundTrip(original); + assertInstanceOf(result, Map); + assertEquals((result as Map).get("x"), 10); + assertEquals((result as Map).get("y"), 20); + }); + + it("round-trips Map with complex values", async () => { + const original = new Map([ + ["nested", { a: 1, b: 2 }], + ["array", [1, 2, 3]], + ]); + const result = (await roundTrip(original)) as Map; + assertInstanceOf(result, Map); + assertEquals(result.get("nested"), { a: 1, b: 2 }); + assertEquals(result.get("array"), [1, 2, 3]); + }); + + it("round-trips Map with non-string keys", async () => { + const original = new Map([ + [1, "one"], + [{ key: "obj" }, "object-key"], + ]); + const result = (await roundTrip(original)) as Map; + assertInstanceOf(result, Map); + assertEquals(result.get(1), "one"); + // Object keys become new objects, so we check by iteration + const entries = Array.from(result.entries()); + assertEquals(entries.length, 2); + assertEquals(entries[1][1], "object-key"); + }); + }); + + describe("Set", () => { + it("round-trips Set with primitives", async () => { + const original = new Set([1, 2, 3]); + const result = await roundTrip(original); + assertInstanceOf(result, Set); + assertEquals((result as Set).has(1), true); + assertEquals((result as Set).has(2), true); + assertEquals((result as Set).has(3), true); + assertEquals((result as Set).size, 3); + }); + + it("round-trips Set with strings", async () => { + const original = new Set(["a", "b", "c"]); + const result = await roundTrip(original); + assertInstanceOf(result, Set); + assertEquals((result as Set).has("a"), true); + assertEquals((result as Set).size, 3); + }); + }); + + describe("RegExp", () => { + it("round-trips simple RegExp", async () => { + const original = /test\d+/im; + const result = await roundTrip(original); + assertInstanceOf(result, RegExp); + assertEquals((result as RegExp).source, "test\\d+"); + assertEquals((result as RegExp).flags, "im"); + }); + + it("round-trips RegExp with special characters", async () => { + const original = /hello\s+world/gi; + const result = await roundTrip(original); + assertInstanceOf(result, RegExp); + assertEquals((result as RegExp).source, "hello\\s+world"); + assertEquals((result as RegExp).flags, "gi"); + }); + }); + + describe("Error", () => { + it("round-trips Error with message", async () => { + const original = new Error("test message"); + const result = await roundTrip(original); + assertInstanceOf(result, Error); + assertEquals((result as Error).message, "test message"); + assertEquals((result as Error).name, "Error"); + }); + + it("preserves custom error properties", async () => { + const original = Object.assign(new Error("test"), { + code: "ERR_TEST", + details: { foo: "bar" }, + }); + const result = (await roundTrip(original)) as Error & { + code: string; + details: { foo: string }; + }; + assertInstanceOf(result, Error); + assertEquals(result.message, "test"); + assertEquals(result.code, "ERR_TEST"); + assertEquals(result.details, { foo: "bar" }); + }); + }); + + describe("TypedArray", () => { + it("round-trips Uint8Array", async () => { + const original = new Uint8Array([10, 20, 30]); + const result = await roundTrip(original); + assertInstanceOf(result, Uint8Array); + assertEquals(Array.from(result as Uint8Array), [10, 20, 30]); + }); + + it("round-trips Int32Array", async () => { + const original = new Int32Array([100, -200, 300]); + const result = await roundTrip(original); + assertInstanceOf(result, Int32Array); + assertEquals(Array.from(result as Int32Array), [100, -200, 300]); + }); + + it("round-trips Float64Array", async () => { + const original = new Float64Array([1.5, 2.5, 3.5]); + const result = await roundTrip(original); + assertInstanceOf(result, Float64Array); + assertEquals(Array.from(result as Float64Array), [1.5, 2.5, 3.5]); + }); + + it("round-trips BigInt64Array", async () => { + const original = new BigInt64Array([BigInt(1), BigInt(-2), BigInt(3)]); + const result = await roundTrip(original); + assertInstanceOf(result, BigInt64Array); + assertEquals( + Array.from(result as BigInt64Array), + [BigInt(1), BigInt(-2), BigInt(3)], + ); + }); + }); + + describe("ArrayBuffer", () => { + it("round-trips ArrayBuffer", async () => { + const original = new Uint8Array([5, 6, 7, 8]).buffer; + const result = await roundTrip(original); + assertInstanceOf(result, ArrayBuffer); + assertEquals( + Array.from(new Uint8Array(result as ArrayBuffer)), + [5, 6, 7, 8], + ); + }); + }); + + describe("DataView", () => { + it("round-trips DataView", async () => { + const original = new DataView(new Uint8Array([1, 2, 3, 4]).buffer); + const result = await roundTrip(original); + assertInstanceOf(result, DataView); + assertEquals((result as DataView).byteLength, 4); + assertEquals((result as DataView).getUint8(0), 1); + }); + }); + + describe("WeakMap/WeakSet/WeakRef", () => { + it("round-trips WeakMap as new empty WeakMap", async () => { + const result = await roundTrip(new WeakMap()); + assertInstanceOf(result, WeakMap); + }); + + it("round-trips WeakSet as new empty WeakSet", async () => { + const result = await roundTrip(new WeakSet()); + assertInstanceOf(result, WeakSet); + }); + }); + + describe("complex nested structures", () => { + it("handles nested objects with special values", async () => { + const original = { + bigint: BigInt(123), + nested: { + date: new Date("2024-01-01"), + map: new Map([["key", "value"]]), + }, + array: [1, undefined, BigInt(456)], + }; + + const result = (await roundTrip(original)) as { + bigint: number; + nested: { date: Date; map: Map }; + array: [number, undefined, number]; + }; + + // Small bigints are converted to numbers by CBOR + assertEquals(result.bigint, 123); + assertInstanceOf(result.nested.date, Date); + assertInstanceOf(result.nested.map, Map); + assertEquals(result.nested.map.get("key"), "value"); + assertEquals(result.array[0], 1); + assertEquals(result.array[1], undefined); + assertEquals(result.array[2], 456); + }); + + it("handles array of mixed special types", async () => { + const original = [ + BigInt(1), + new Date("2024-01-01"), + new Set([1, 2]), + /test/g, + ]; + + const result = (await roundTrip(original)) as unknown[]; + + // Small bigints are converted to numbers by CBOR + assertEquals(result[0], 1); + assertInstanceOf(result[1], Date); + assertInstanceOf(result[2], Set); + assertInstanceOf(result[3], RegExp); + }); + + it("preserves undefined in object properties", async () => { + const original = { a: 1, b: undefined, c: 3 }; + const result = (await roundTrip(original)) as typeof original; + assertEquals(result, { a: 1, b: undefined, c: 3 }); + assertEquals("b" in result, true); + }); + + it("preserves undefined in arrays", async () => { + const original = [1, undefined, 3]; + const result = await roundTrip(original); + assertEquals(result, [1, undefined, 3]); + }); + }); + + describe("toCborStreamInput output type", () => { + it("returns CborStreamInput for objects", () => { + const result = toCborStreamInput({ foo: "bar" }); + // Should be a plain object (Record) + assertEquals(typeof result, "object"); + assertEquals(result !== null, true); + }); + + it("returns primitives directly", () => { + assertEquals(toCborStreamInput(42), 42); + assertEquals(toCborStreamInput("hello"), "hello"); + assertEquals(toCborStreamInput(null), null); + assertEquals(toCborStreamInput(undefined), undefined); + }); + }); +}); diff --git a/src/cli/_templates/utils.ts b/src/cli/_templates/utils.ts index 8506cae..da9e589 100644 --- a/src/cli/_templates/utils.ts +++ b/src/cli/_templates/utils.ts @@ -1,11 +1,21 @@ /** - * Shared utilities for subprocess templates + * Shared utilities for subprocess templates (Subprocess Side) * * These utilities are used by subprocess entry points (run.ts, list.ts) * for TCP-based IPC communication with the parent process. * - * Using TCP instead of stdin/stdout allows subprocess to use console.log - * freely without corrupting IPC messages. + * ## Architecture: Subprocess vs Parent IPC + * + * This module handles the **subprocess** side of IPC, while + * `../subprocess.ts` handles the **parent process** side. + * + * Key design decisions for subprocess side: + * - Pre-acquired `reader`/`writer` for consistent typed API + * - Automatic CBOR stream piping (encoder.readable → TCP, TCP → decoder) + * - `runSubprocess()` factory for standardized bootstrap + * - Simpler error handling (errors propagate to parent) + * + * See `../subprocess.ts` module docs for the full architecture explanation. * * IMPORTANT: This module must be self-contained with no relative imports * outside of _templates/. External dependencies are resolved at template @@ -15,23 +25,61 @@ * @internal */ -import { JsonParseStream } from "@std/json/parse-stream"; -import { TextLineStream } from "@std/streams"; +import { + CborSequenceDecoderStream, + CborSequenceEncoderStream, + type CborStreamInput, + type CborStreamOutput, +} from "@std/cbor"; +import { + type ErrorObject, + fromErrorObject, + toErrorObject, +} from "@core/errorutil/error-object"; +import { fromCborStreamOutput, toCborStreamInput } from "./serializer.ts"; export { configureLogging } from "./logging.ts"; +export type { ErrorObject } from "@core/errorutil/error-object"; + +/** + * Serialize an error for cross-process transmission + * + * Converts any error (or non-error value) to ErrorObject format + * that can be safely transmitted via CBOR. + * + * @param error - Error or value to serialize + * @returns Serialized error object + */ +export function serializeError(error: unknown): ErrorObject { + const err = error instanceof Error ? error : new Error(String(error)); + return toErrorObject(err); +} + +/** + * Deserialize an error from cross-process transmission + * + * Restores Error instance from ErrorObject format. + * + * @param serialized - Serialized error object + * @returns Restored Error instance + */ +export function deserializeError(serialized: ErrorObject): Error { + return fromErrorObject(serialized); +} /** * IPC connection to parent process + * + * Provides pre-acquired reader/writer for CBOR-encoded communication. + * Both reader and writer are typed consistently for symmetric API. */ export interface IpcConnection { - /** Readable stream for receiving NDJSON from parent */ - readable: ReadableStream; - /** Writer for sending NDJSON to parent (exclusive access) */ - writer: WritableStreamDefaultWriter; - /** Text encoder for converting strings to bytes */ - encoder: TextEncoder; + /** CBOR-decoded reader (emits CborStreamOutput) */ + readonly reader: ReadableStreamDefaultReader; + /** CBOR-encoded writer (accepts CborStreamInput) */ + readonly writer: WritableStreamDefaultWriter; /** Close the underlying TCP connection (async to properly flush writes) */ - close: () => Promise; + readonly close: () => Promise; } /** @@ -58,32 +106,51 @@ export function parseIpcPort(args: string[]): number { /** * Connect to parent process IPC server * + * Sets up bidirectional CBOR-encoded communication: + * - Reading: TCP bytes → CborSequenceDecoderStream → CborStreamOutput + * - Writing: CborStreamInput → CborSequenceEncoderStream → TCP bytes + * * @param port - TCP port to connect to - * @returns IPC connection with typed streams + * @returns IPC connection with pre-acquired reader/writer */ export async function connectIpc(port: number): Promise { const conn = await Deno.connect({ port, hostname: "127.0.0.1" }); - // Create NDJSON readable stream from connection - const readable = conn.readable - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TextLineStream()) - .pipeThrough(new JsonParseStream()); + // Set up reading: TCP bytes → CBOR decoder → CborStreamOutput + const cborReadable = conn.readable.pipeThrough( + new CborSequenceDecoderStream(), + ); + const reader = cborReadable.getReader(); - // Get exclusive writer for the writable stream - const writer = conn.writable.getWriter(); + // Set up writing: CborStreamInput → CBOR encoder → TCP bytes + // We need to track the pipe completion to properly close the connection + const encoder = new CborSequenceEncoderStream(); + const pipePromise = encoder.readable.pipeTo(conn.writable); + const writer = encoder.writable.getWriter(); return { - readable, + reader, writer, - encoder: new TextEncoder(), close: async () => { + // Close writer first (signals no more data to encoder) try { - // Await writer.close() to ensure all pending writes are flushed await writer.close(); } catch { // Already closed or errored } + // Wait for pipe to complete (flushes all encoded data to TCP) + try { + await pipePromise; + } catch { + // Pipe may error if connection was closed by peer + } + // Release reader lock + try { + reader.releaseLock(); + } catch { + // Already released + } + // Close underlying connection try { conn.close(); } catch { @@ -96,30 +163,20 @@ export async function connectIpc(port: number): Promise { /** * Read input from parent process via IPC * - * Reads and parses the first JSON message from the IPC connection. - * * @param ipc - IPC connection - * @returns Parsed input object + * @returns Decoded input object */ export async function readInput(ipc: IpcConnection): Promise { - const reader = ipc.readable.getReader(); - try { - const result = await reader.read(); - if (result.done) { - throw new Error("IPC connection closed before receiving input"); - } - return result.value; - } finally { - reader.releaseLock(); + const result = await ipc.reader.read(); + if (result.done) { + throw new Error("IPC connection closed before receiving input"); } + // Convert CborStreamOutput to regular value with custom type restoration + return await fromCborStreamOutput(result.value); } /** - * Write output to parent process via IPC (NDJSON format) - * - * Serializes the output object to JSON and writes to the IPC connection. - * Each output is a single line for streaming. - * Uses the exclusive writer from IpcConnection to ensure serialized writes. + * Write output to parent process via IPC * * @param ipc - IPC connection * @param output - Object to serialize and write @@ -128,8 +185,11 @@ export async function writeOutput( ipc: IpcConnection, output: unknown, ): Promise { - const data = ipc.encoder.encode(JSON.stringify(output) + "\n"); - await ipc.writer.write(data); + // Convert to CborStreamInput (handles custom types via tagged values) + const cborInput = toCborStreamInput(output); + + // Single atomic write (encoder transforms to CBOR bytes) + await ipc.writer.write(cborInput); } /** @@ -140,3 +200,109 @@ export async function writeOutput( export async function closeIpc(ipc: IpcConnection): Promise { await ipc.close(); } + +/** + * Create a type guard function for protocol output validation + * + * All subprocess outputs must have a `type` discriminator field. + * This factory creates a validator that checks: + * 1. Value is a non-null object + * 2. Has a `type` property that is a string + * 3. `type` is one of the valid types + * + * @param validTypes - Array of valid type discriminator values + * @returns Type guard function + * + * @example + * ```ts ignore + * type MyOutput = { type: "result"; data: string } | { type: "error"; message: string }; + * const isMyOutput = createOutputValidator(["result", "error"]); + * ``` + */ +export function createOutputValidator( + validTypes: readonly string[], +): (value: unknown) => value is T { + const typeSet = new Set(validTypes); + return (value: unknown): value is T => + value !== null && + typeof value === "object" && + "type" in value && + typeof value.type === "string" && + typeSet.has(value.type); +} + +/** + * Handler function for subprocess execution + * + * Receives the IPC connection and parsed input, and is responsible + * for processing and writing success outputs. Errors thrown are + * caught by the bootstrap and written as error outputs. + */ +export type SubprocessHandler = ( + ipc: IpcConnection, + input: I, +) => Promise; + +/** + * Run a subprocess with standardized bootstrap logic + * + * This factory handles the common subprocess lifecycle: + * 1. Parse --ipc-port from command line arguments + * 2. Connect to parent process IPC server + * 3. Read input from IPC + * 4. Execute the handler + * 5. Catch errors and write error output + * 6. Close IPC connection + * 7. Exit process + * + * The handler is responsible for writing success outputs. + * Any errors thrown by the handler are caught and written as + * `{ type: "error", error: ErrorObject }`. + * + * @param handler - Function to process input and write outputs + * + * @example + * ```ts ignore + * runSubprocess<{ filePaths: string[] }>(async (ipc, input) => { + * const result = await doWork(input.filePaths); + * await writeOutput(ipc, { type: "result", data: result }); + * }); + * ``` + */ +export function runSubprocess(handler: SubprocessHandler): void { + const main = async (): Promise => { + // Parse IPC port from command line arguments + const port = parseIpcPort(Deno.args); + + // Connect to parent process IPC server + const ipc = await connectIpc(port); + + try { + // Read input from IPC (TCP connection establishes readiness) + const input = await readInput(ipc) as I; + + // Execute handler (responsible for writing success outputs) + await handler(ipc, input); + } catch (error) { + // Write error output + try { + await writeOutput(ipc, { + type: "error", + error: serializeError(error), + }); + } catch { + // Failed to write error to IPC, log to console as fallback + console.error("Subprocess error:", error); + } + } finally { + // Await close to ensure all pending writes are flushed + await closeIpc(ipc); + } + }; + + // Run main and exit explicitly to avoid async operations keeping process alive + main().finally(() => { + // Ensure process exits after output is flushed + setTimeout(() => Deno.exit(0), 0); + }); +} diff --git a/src/cli/_templates/utils_test.ts b/src/cli/_templates/utils_test.ts new file mode 100644 index 0000000..0d18da2 --- /dev/null +++ b/src/cli/_templates/utils_test.ts @@ -0,0 +1,232 @@ +/** + * Tests for subprocess template utilities + * + * @module + */ + +import { assertEquals, assertInstanceOf, assertThrows } from "@std/assert"; +import { describe, it } from "@std/testing/bdd"; +import { + createOutputValidator, + deserializeError, + parseIpcPort, + serializeError, +} from "./utils.ts"; + +describe("utils", () => { + describe("serializeError", () => { + it("serializes Error objects", () => { + const error = new Error("test message"); + const serialized = serializeError(error); + + assertEquals(serialized.name, "Error"); + assertEquals(serialized.message, "test message"); + assertEquals(typeof serialized.stack, "string"); + }); + + it("serializes custom error types", () => { + class CustomError extends Error { + constructor(message: string) { + super(message); + this.name = "CustomError"; + } + } + const error = new CustomError("custom message"); + const serialized = serializeError(error); + + assertEquals(serialized.name, "CustomError"); + assertEquals(serialized.message, "custom message"); + }); + + it("converts non-Error values to Error", () => { + const serialized = serializeError("string error"); + + assertEquals(serialized.name, "Error"); + assertEquals(serialized.message, "string error"); + }); + + it("handles null and undefined", () => { + assertEquals(serializeError(null).message, "null"); + assertEquals(serializeError(undefined).message, "undefined"); + }); + + it("handles objects", () => { + const serialized = serializeError({ code: 42 }); + + assertEquals(serialized.message, "[object Object]"); + }); + }); + + describe("deserializeError", () => { + it("restores Error from serialized object", () => { + const original = new Error("test message"); + const serialized = serializeError(original); + const restored = deserializeError(serialized); + + assertInstanceOf(restored, Error); + assertEquals(restored.message, "test message"); + }); + + it("preserves error name", () => { + class CustomError extends Error { + constructor(message: string) { + super(message); + this.name = "CustomError"; + } + } + const original = new CustomError("custom message"); + const serialized = serializeError(original); + const restored = deserializeError(serialized); + + assertEquals(restored.name, "CustomError"); + }); + + it("round-trips error correctly", () => { + const original = new TypeError("type error"); + const restored = deserializeError(serializeError(original)); + + assertEquals(restored.name, "TypeError"); + assertEquals(restored.message, "type error"); + }); + }); + + describe("parseIpcPort", () => { + it("parses valid port", () => { + const port = parseIpcPort(["--ipc-port", "8080"]); + assertEquals(port, 8080); + }); + + it("parses port with other arguments", () => { + const port = parseIpcPort([ + "--foo", + "bar", + "--ipc-port", + "3000", + "--baz", + ]); + assertEquals(port, 3000); + }); + + it("throws when --ipc-port is missing", () => { + assertThrows( + () => parseIpcPort(["--other", "arg"]), + Error, + "Missing required --ipc-port argument", + ); + }); + + it("throws when --ipc-port has no value", () => { + assertThrows( + () => parseIpcPort(["--ipc-port"]), + Error, + "Missing required --ipc-port argument", + ); + }); + + it("throws for invalid port value", () => { + assertThrows( + () => parseIpcPort(["--ipc-port", "invalid"]), + Error, + "Invalid --ipc-port value: invalid", + ); + }); + + it("throws for zero port", () => { + assertThrows( + () => parseIpcPort(["--ipc-port", "0"]), + Error, + "Invalid --ipc-port value: 0", + ); + }); + + it("throws for negative port", () => { + assertThrows( + () => parseIpcPort(["--ipc-port", "-1"]), + Error, + "Invalid --ipc-port value: -1", + ); + }); + + it("throws for port above 65535", () => { + assertThrows( + () => parseIpcPort(["--ipc-port", "65536"]), + Error, + "Invalid --ipc-port value: 65536", + ); + }); + + it("accepts maximum valid port", () => { + const port = parseIpcPort(["--ipc-port", "65535"]); + assertEquals(port, 65535); + }); + + it("accepts minimum valid port", () => { + const port = parseIpcPort(["--ipc-port", "1"]); + assertEquals(port, 1); + }); + }); + + describe("createOutputValidator", () => { + type TestOutput = + | { type: "result"; data: string } + | { type: "error"; message: string }; + + const isTestOutput = createOutputValidator(["result", "error"]); + + it("returns true for valid type", () => { + assertEquals(isTestOutput({ type: "result", data: "test" }), true); + assertEquals(isTestOutput({ type: "error", message: "fail" }), true); + }); + + it("returns false for invalid type", () => { + assertEquals(isTestOutput({ type: "unknown" }), false); + }); + + it("returns false for null", () => { + assertEquals(isTestOutput(null), false); + }); + + it("returns false for undefined", () => { + assertEquals(isTestOutput(undefined), false); + }); + + it("returns false for primitives", () => { + assertEquals(isTestOutput("string"), false); + assertEquals(isTestOutput(123), false); + assertEquals(isTestOutput(true), false); + }); + + it("returns false for object without type", () => { + assertEquals(isTestOutput({ data: "test" }), false); + }); + + it("returns false for non-string type", () => { + assertEquals(isTestOutput({ type: 123 }), false); + assertEquals(isTestOutput({ type: null }), false); + }); + + it("returns false for array", () => { + assertEquals(isTestOutput([]), false); + assertEquals(isTestOutput([{ type: "result" }]), false); + }); + + it("works with single valid type", () => { + const isSingleType = createOutputValidator<{ type: "only" }>(["only"]); + assertEquals(isSingleType({ type: "only" }), true); + assertEquals(isSingleType({ type: "other" }), false); + }); + + it("works with many valid types", () => { + const isManyTypes = createOutputValidator<{ type: string }>([ + "a", + "b", + "c", + "d", + "e", + ]); + assertEquals(isManyTypes({ type: "a" }), true); + assertEquals(isManyTypes({ type: "e" }), true); + assertEquals(isManyTypes({ type: "f" }), false); + }); + }); +}); diff --git a/src/cli/commands/list.ts b/src/cli/commands/list.ts index bab6035..714c83c 100644 --- a/src/cli/commands/list.ts +++ b/src/cli/commands/list.ts @@ -19,14 +19,7 @@ import { loadEnvironment, readAsset, } from "../utils.ts"; -import { - createNdjsonStream, - prepareSubprocessScript, - sendJsonInput, - spawnDenoSubprocess, - startIpcServer, - waitForIpcConnection, -} from "../subprocess.ts"; +import { runSubprocessToCompletion } from "../subprocess.ts"; import { isListOutput, type ListInput, @@ -227,59 +220,21 @@ async function runListSubprocess( allScenarios: ScenarioMeta[]; filteredScenarios: ScenarioMeta[]; }> { - // Start IPC server and get port - const { listener, port } = startIpcServer(); - - // Prepare subprocess script (resolve bare specifiers) - const templateUrl = new URL("../_templates/list.ts", import.meta.url); - const { scriptPath, tempDir } = await prepareSubprocessScript( - templateUrl, - "list", - ); - - // Spawn subprocess - const proc = spawnDenoSubprocess({ - denoArgs, - scriptPath, - cwd, - ipcPort: port, - }); - - // Wait for subprocess to connect (connection = ready) - // Race against subprocess exit to detect early failures - const ipc = await waitForIpcConnection(listener, { subprocess: proc }); - - // Create NDJSON stream from IPC connection - const outputStream = createNdjsonStream(ipc.readable, isListOutput); - - // Send list input to subprocess via IPC - await sendJsonInput( - ipc.writable, + return await runSubprocessToCompletion< + ListInput, + ListOutput, + { allScenarios: ScenarioMeta[]; filteredScenarios: ScenarioMeta[] } + >( { - filePaths, - selectors, - } satisfies ListInput, + templateUrl: new URL("../_templates/list.ts", import.meta.url), + templateName: "list", + input: { filePaths, selectors }, + denoArgs, + cwd, + }, + isListOutput, + (output) => Promise.resolve(handleSubprocessOutput(output)), ); - - try { - // Process output messages - for await (const output of outputStream) { - const result = handleSubprocessOutput(output); - if (result) { - return result; - } - } - - throw new Error("Subprocess ended without sending result"); - } finally { - // Wait for subprocess to exit first to allow proper cleanup. - // This prevents closing IPC while the subprocess is still writing. - await proc.status; - await ipc.close(); - listener.close(); - // Clean up temporary directory - await Deno.remove(tempDir, { recursive: true }).catch(() => {}); - } } /** diff --git a/src/cli/commands/run.ts b/src/cli/commands/run.ts index 77c45db..60867c4 100644 --- a/src/cli/commands/run.ts +++ b/src/cli/commands/run.ts @@ -22,14 +22,7 @@ import { resolveReporter, } from "../utils.ts"; import { createDiscoveryProgress } from "../progress.ts"; -import { - createNdjsonStream, - prepareSubprocessScript, - sendJsonInput, - spawnDenoSubprocess, - startIpcServer, - waitForIpcConnection, -} from "../subprocess.ts"; +import { cleanupSubprocess, startSubprocess } from "../subprocess.ts"; import { deserializeError, deserializeRunResult, @@ -270,42 +263,27 @@ async function runWithSubprocess( throw new Error("Aborted before execution started"); } - // Start IPC server and get port - const { listener, port } = startIpcServer(); - - // Prepare and spawn subprocess - const templateUrl = new URL("../_templates/run.ts", import.meta.url); - const { scriptPath, tempDir } = await prepareSubprocessScript( - templateUrl, - "run", - ); - const proc = spawnDenoSubprocess({ - denoArgs, - scriptPath, - cwd, - ipcPort: port, - signal, - }); - - // Wait for subprocess to connect (connection = ready) - // Race against subprocess exit to detect early failures - const ipc = await waitForIpcConnection(listener, { subprocess: proc }); - - // Create NDJSON stream from IPC connection - const outputStream = createNdjsonStream(ipc.readable, isRunOutput); - - // Send run input to subprocess via IPC - await sendJsonInput( - ipc.writable, + const { outputStream, resources } = await startSubprocess< + RunCommandInput, + RunOutput + >( { - type: "run", - filePaths, - selectors, - maxConcurrency, - maxFailures, - timeout, - logLevel, - } satisfies RunCommandInput, + templateUrl: new URL("../_templates/run.ts", import.meta.url), + templateName: "run", + input: { + type: "run", + filePaths, + selectors, + maxConcurrency, + maxFailures, + timeout, + logLevel, + }, + denoArgs, + cwd, + signal, + }, + isRunOutput, ); try { @@ -334,14 +312,7 @@ async function runWithSubprocess( throw new Error("Subprocess ended without sending result"); } finally { - // Wait for subprocess to exit first to allow proper cleanup. - // This prevents closing IPC while the subprocess is still writing, - // which could cause "BadResource: Bad resource ID" errors. - await proc.status; - await ipc.close(); - listener.close(); - // Clean up temporary directory - await Deno.remove(tempDir, { recursive: true }).catch(() => {}); + await cleanupSubprocess(resources); } } diff --git a/src/cli/subprocess.ts b/src/cli/subprocess.ts index 7221464..04235f5 100644 --- a/src/cli/subprocess.ts +++ b/src/cli/subprocess.ts @@ -1,15 +1,45 @@ /** - * Subprocess utilities for scenario execution + * Subprocess utilities for scenario execution (Parent Process Side) * * Spawns deno subprocess with TCP-based IPC communication. * This avoids stdin/stdout pollution from user code (e.g., console.log). * + * ## Architecture: Parent vs Subprocess IPC + * + * This module handles the **parent process** side of IPC, while + * `_templates/utils.ts` handles the **subprocess** side. + * + * The two sides have intentionally different stream patterns: + * + * ### Parent Process (this module) + * - Uses raw `Uint8Array` streams from TCP connection + * - Manual CBOR encoding via `encodeToCbor()` for one-shot input sending + * - Manual CBOR decoding via `createCborStream()` for validated output streaming + * - Manages lifecycle: spawn → connect → send → receive → cleanup + * + * ### Subprocess (`_templates/utils.ts`) + * - Uses pre-acquired `reader`/`writer` with piped CBOR streams + * - Automatic encoding: write CborStreamInput → encoder → TCP + * - Automatic decoding: TCP → decoder → read CborStreamOutput + * - Simpler API since subprocess lifecycle is managed by parent + * + * This asymmetry exists because: + * 1. Parent needs fine-grained control for validation and multiple outputs + * 2. Subprocess can use simpler fire-and-forget streaming + * 3. Parent manages resources; subprocess just uses them + * * @module */ -import { JsonParseStream } from "@std/json/parse-stream"; +import { + CborSequenceDecoderStream, + CborSequenceEncoderStream, +} from "@std/cbor"; +import { + fromCborStreamOutput, + toCborStreamInput, +} from "./_templates/serializer.ts"; import { dirname, join } from "@std/path"; -import { TextLineStream } from "@std/streams"; import { type ResolvedTemplateFiles, resolveSubprocessTemplate, @@ -138,12 +168,12 @@ export function spawnDenoSubprocess( /** * IPC connection from subprocess * - * Provides read/write streams for NDJSON communication. + * Provides read/write streams for CBOR communication. */ export interface IpcConnection { - /** Readable stream for receiving NDJSON from subprocess */ + /** Readable stream for receiving data from subprocess */ readable: ReadableStream; - /** Writable stream for sending JSON to subprocess */ + /** Writable stream for sending data to subprocess */ writable: WritableStream; /** Close the connection */ close(): Promise; @@ -251,56 +281,309 @@ export interface SubprocessOutput { } /** - * Create NDJSON stream from IPC connection - * - * Transforms raw byte stream into parsed and validated JSON objects. + * Create CBOR stream from IPC connection * * @typeParam T - Output type (must have `type` discriminator) * @param readable - Raw readable stream from IPC connection * @param validator - Type guard function to validate each output * @returns Typed readable stream of validated outputs */ -export function createNdjsonStream( +export function createCborStream( readable: ReadableStream, validator: (chunk: unknown) => chunk is T, ): ReadableStream { - return readable - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TextLineStream()) - .pipeThrough(new JsonParseStream()) - .pipeThrough( - new TransformStream({ - transform(chunk, controller) { - if (!validator(chunk)) { - throw new Error( - `Invalid subprocess output: ${JSON.stringify(chunk)}`, - ); + const cborStream = readable.pipeThrough(new CborSequenceDecoderStream()); + const reader = cborStream.getReader(); + + return new ReadableStream({ + async pull(controller) { + try { + const result = await reader.read(); + if (result.done) { + controller.close(); + return; + } + + // Convert CborStreamOutput to regular value with custom type restoration + const decoded = await fromCborStreamOutput(result.value); + + // Validate + if (!validator(decoded)) { + let decodedStr: string; + try { + decodedStr = JSON.stringify(decoded); + } catch { + decodedStr = String(decoded); } - controller.enqueue(chunk); - }, - }), - ); + throw new Error(`Invalid subprocess output: ${decodedStr}`); + } + + controller.enqueue(decoded); + } catch (error) { + controller.error(error); + } + }, + + cancel() { + reader.releaseLock(); + }, + }); } /** - * Send JSON input to subprocess via IPC + * Encode a value to CBOR bytes + * + * Uses CborSequenceEncoderStream to encode the value, then collects + * the output bytes. This provides standalone encoding without requiring + * a persistent stream connection. * - * Serializes input as JSON line and writes to IPC connection. - * Does NOT close the stream (allows for further communication). + * IMPORTANT: Reading and writing must happen concurrently to avoid + * backpressure deadlock. The encoder may block writes until the + * readable side is being consumed. + * + * @param input - Value to encode + * @returns CBOR-encoded bytes + */ +async function encodeToCbor(input: unknown): Promise { + // Convert to CborStreamInput (handles custom types via tagged values) + const cborInput = toCborStreamInput(input); + + // Create encoder and get reader/writer + const encoder = new CborSequenceEncoderStream(); + const writer = encoder.writable.getWriter(); + const reader = encoder.readable.getReader(); + + // Start collecting output BEFORE writing to avoid backpressure deadlock + const collectPromise = (async () => { + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + return chunks; + })(); + + // Write input and close to flush + await writer.write(cborInput); + await writer.close(); + + // Wait for collection to complete + const chunks = await collectPromise; + + // Concatenate chunks into single Uint8Array + const totalLength = chunks.reduce((sum, c) => sum + c.length, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.length; + } + return result; +} + +/** + * Send CBOR input to subprocess via IPC * * @param writable - IPC writable stream * @param input - Input data to serialize and send */ -export async function sendJsonInput( +export async function sendCborInput( writable: WritableStream, input: unknown, ): Promise { - const encoder = new TextEncoder(); const writer = writable.getWriter(); try { - // Send as NDJSON (newline-delimited JSON) - await writer.write(encoder.encode(JSON.stringify(input) + "\n")); + // Encode to CBOR bytes + const cborData = await encodeToCbor(input); + + // Single atomic write + await writer.write(cborData); } finally { writer.releaseLock(); } } + +/** + * Resources to clean up after subprocess execution + */ +export interface SubprocessResources { + /** Spawned subprocess */ + proc: Deno.ChildProcess; + /** IPC connection to subprocess */ + ipc: IpcConnection; + /** TCP listener for IPC */ + listener: Deno.Listener; + /** Temporary directory containing subprocess script */ + tempDir: string; +} + +/** + * Clean up subprocess resources in the correct order + * + * Order matters: + * 1. Wait for subprocess to exit (allows proper cleanup) + * 2. Close IPC connection (flushes pending writes) + * 3. Close TCP listener + * 4. Remove temporary directory + * + * @param resources - Resources to clean up + */ +export async function cleanupSubprocess( + resources: SubprocessResources, +): Promise { + const { proc, ipc, listener, tempDir } = resources; + + // Wait for subprocess to exit first to allow proper cleanup. + // This prevents closing IPC while the subprocess is still writing, + // which could cause "BadResource: Bad resource ID" errors. + await proc.status; + await ipc.close(); + listener.close(); + // Clean up temporary directory (ignore errors if already removed) + await Deno.remove(tempDir, { recursive: true }).catch(() => {}); +} + +/** + * Options for running a subprocess + */ +export interface RunSubprocessOptions { + /** URL to the subprocess template file */ + templateUrl: URL; + /** Name of the template (for embedded templates lookup) */ + templateName: string; + /** Input to send to subprocess */ + input: I; + /** Additional deno arguments */ + denoArgs: string[]; + /** Current working directory */ + cwd: string; + /** Optional AbortSignal for cancellation */ + signal?: AbortSignal; +} + +/** + * Result of running a subprocess + */ +export interface SubprocessResult { + /** Output stream from subprocess */ + outputStream: ReadableStream; + /** Resources for cleanup (use cleanupSubprocess when done) */ + resources: SubprocessResources; +} + +/** + * Start a subprocess and return the output stream + * + * This is a low-level function that sets up the subprocess infrastructure. + * Caller is responsible for: + * 1. Iterating over outputStream to process messages + * 2. Calling cleanupSubprocess(resources) in a finally block + * + * For most use cases, prefer runSubprocessToCompletion() which handles + * the full lifecycle including cleanup. + * + * @typeParam I - Input type + * @typeParam O - Output type (must have `type` discriminator) + * @param options - Subprocess options + * @param validator - Type guard to validate output messages + * @returns Output stream and resources for cleanup + */ +export async function startSubprocess( + options: RunSubprocessOptions, + validator: (value: unknown) => value is O, +): Promise> { + const { templateUrl, templateName, input, denoArgs, cwd, signal } = options; + + // Start IPC server and get port + const { listener, port } = startIpcServer(); + + // Prepare subprocess script (resolve bare specifiers) + const { scriptPath, tempDir } = await prepareSubprocessScript( + templateUrl, + templateName, + ); + + // Spawn subprocess + const proc = spawnDenoSubprocess({ + denoArgs, + scriptPath, + cwd, + ipcPort: port, + signal, + }); + + // Wait for subprocess to connect (connection = ready) + // Race against subprocess exit to detect early failures + const ipc = await waitForIpcConnection(listener, { subprocess: proc }); + + // Create CBOR stream from IPC connection + const outputStream = createCborStream(ipc.readable, validator); + + // Send input to subprocess via IPC + await sendCborInput(ipc.writable, input); + + return { + outputStream, + resources: { proc, ipc, listener, tempDir }, + }; +} + +/** + * Handler for subprocess output messages + * + * Process each output message and optionally return a final result. + * When a result is returned, the subprocess execution completes. + * + * @typeParam O - Output type + * @typeParam R - Result type + */ +export type SubprocessOutputHandler = (output: O) => Promise; + +/** + * Run a subprocess to completion + * + * This high-level function handles the full subprocess lifecycle: + * 1. Start subprocess and establish IPC connection + * 2. Send input to subprocess + * 3. Process output messages via handler + * 4. Clean up resources automatically + * + * @typeParam I - Input type + * @typeParam O - Output type (must have `type` discriminator) + * @typeParam R - Result type returned by handler + * @param options - Subprocess options + * @param validator - Type guard to validate output messages + * @param handler - Function to process each output message + * @returns Result from handler + * @throws Error if subprocess ends without returning a result + */ +export async function runSubprocessToCompletion< + I, + O extends SubprocessOutput, + R, +>( + options: RunSubprocessOptions, + validator: (value: unknown) => value is O, + handler: SubprocessOutputHandler, +): Promise { + const { outputStream, resources } = await startSubprocess(options, validator); + + try { + for await (const output of outputStream) { + const result = await handler(output); + if (result !== undefined) { + return result; + } + } + + // Handle abort case + if (options.signal?.aborted) { + throw new Error("Subprocess aborted"); + } + + throw new Error("Subprocess ended without sending result"); + } finally { + await cleanupSubprocess(resources); + } +}