Skip to content
196 changes: 195 additions & 1 deletion packages/core/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ import {
} from "./streaming/guardrail-stream";
import { SubAgentManager } from "./subagent";
import type { SubAgentConfig } from "./subagent/types";
import type { VoltAgentTextStreamPart } from "./subagent/types";
import { SUBAGENT_METADATA_KEYS } from "./subagent/types";
import type { SubagentMetadata, VoltAgentTextStreamPart } from "./subagent/types";
import type {
AgentEvalConfig,
AgentEvalOperationType,
Expand Down Expand Up @@ -233,6 +234,124 @@ const callToolParameters = z.object({
// Types
// ============================================================================

/**
* Extracts subagent metadata fields from a stream part
*/
function extractSubagentMetadata(part: VoltAgentTextStreamPart): SubagentMetadata {
const metadata: Record<string, unknown> = {};
for (const key of SUBAGENT_METADATA_KEYS) {
if (part[key] != null) {
metadata[key] = part[key];
}
}
return metadata as SubagentMetadata;
}

/**
* Base UIMessageChunk with subagent metadata fields.
* Uses a permissive base that allows UIMessageStream to accept our extended chunks.
*/
export type UIMessageChunkWithMetadata = {
type: string;
[key: string]: unknown;
} & SubagentMetadata;

/**
* Converts a fullStream part to UIMessageChunk format with subagent metadata
* @internal Exported for testing purposes
*/
export function convertFullStreamPartToUIMessageChunk(
part: VoltAgentTextStreamPart,
options: FullStreamToUIMessageStreamOptions,
): UIMessageChunkWithMetadata | undefined {
// Check if this event type should be included based on the types filter
if (options.types && options.types.length > 0 && !options.types.includes(part.type)) {
return undefined;
}

const meta = extractSubagentMetadata(part);

switch (part.type) {
case "text-delta":
return { type: "text-delta", id: part.id, delta: part.text, ...meta };

case "reasoning-delta":
if (!options.sendReasoning) return undefined;
return { type: "reasoning-delta", id: part.id, delta: part.text, ...meta };

case "source":
if (!options.sendSources) return undefined;
return { ...part, ...meta };

case "tool-call":
return {
type: "tool-input-available",
toolCallId: part.toolCallId,
toolName: part.toolName,
input: "input" in part ? part.input : (part as Record<string, unknown>).args,
...meta,
};

case "tool-result":
return {
type: "tool-output-available",
toolCallId: part.toolCallId,
output: "output" in part ? part.output : (part as Record<string, unknown>).result,
...meta,
};

case "tool-input-start":
return {
type: "tool-input-start",
toolCallId: part.id,
toolName: part.toolName,
...meta,
};

case "tool-input-delta":
return {
type: "tool-input-delta",
toolCallId: part.id,
inputTextDelta: part.delta,
...meta,
};

case "tool-error":
return {
type: "tool-output-error",
toolCallId: part.toolCallId,
errorText: String(part.error),
...meta,
};

case "start-step":
if (!options.sendStart) return undefined;
return { type: "start-step", ...meta };

case "finish-step":
if (!options.sendFinish) return undefined;
return { type: "finish-step", finishReason: part.finishReason, usage: part.usage, ...meta };

case "start":
if (!options.sendStart) return undefined;
return { type: "start", ...meta };

case "finish":
if (!options.sendFinish) return undefined;
return { type: "finish", finishReason: part.finishReason, usage: part.totalUsage, ...meta };

case "error":
return {
type: "error",
error: options.onError ? options.onError(part.error) : String(part.error),
...meta,
};

default:
return undefined;
}
}

export type OutputSpec = Output.Output<unknown, unknown>;
type OutputValue<OUTPUT extends OutputSpec> = InferGenerateOutput<OUTPUT>;

Expand Down Expand Up @@ -271,6 +390,24 @@ function sanitizeConversationTitle(text: string, maxLength: number): string {
/**
* Extended StreamTextResult that includes context
*/
/**
* Options for fullStreamToUIMessageStream conversion
*/
export type FullStreamToUIMessageStreamOptions = {
/** Include reasoning/thinking content in the stream */
sendReasoning?: boolean;
/** Include source annotations in the stream */
sendSources?: boolean;
/** Send start events */
sendStart?: boolean;
/** Send finish events */
sendFinish?: boolean;
/** Error handler */
onError?: (error: unknown) => string;
/** Filter to only include specific event types (if not set, includes all) */
types?: Array<VoltAgentTextStreamPart["type"]>;
};

export type StreamTextResultWithContext<
TOOLS extends ToolSet = Record<string, any>,
OUTPUT = unknown,
Expand All @@ -288,6 +425,11 @@ export type StreamTextResultWithContext<
pipeUIMessageStreamToResponse: AIStreamTextResult<TOOLS, any>["pipeUIMessageStreamToResponse"];
pipeTextStreamToResponse: AIStreamTextResult<TOOLS, any>["pipeTextStreamToResponse"];
toTextStreamResponse: AIStreamTextResult<TOOLS, any>["toTextStreamResponse"];
// Convert fullStream to UIMessageStream with subagent metadata preserved
fullStreamToUIMessageStream: (
options?: FullStreamToUIMessageStreamOptions,
) => AsyncIterable<UIMessageChunkWithMetadata>;
fullStreamToUIMessageStreamResponse: (options?: FullStreamToUIMessageStreamOptions) => Response;
// Additional context field
context: Map<string | symbol, unknown>;
// Feedback metadata for the trace, if enabled
Expand Down Expand Up @@ -2040,6 +2182,58 @@ export class Agent {
...(init ?? {}),
});
},
fullStreamToUIMessageStream: (streamOptions?: FullStreamToUIMessageStreamOptions) => {
const opts: FullStreamToUIMessageStreamOptions = {
sendReasoning: streamOptions?.sendReasoning ?? true,
sendSources: streamOptions?.sendSources ?? true,
sendStart: streamOptions?.sendStart ?? true,
sendFinish: streamOptions?.sendFinish ?? true,
onError: streamOptions?.onError ?? ((e: unknown) => String(e)),
types: streamOptions?.types,
};

const fullStream = getGuardrailAwareFullStream();

async function* generateUIStream() {
for await (const part of fullStream) {
const converted = convertFullStreamPartToUIMessageChunk(part, opts);
if (converted) {
yield converted;
}
}
}

return generateUIStream();
},
fullStreamToUIMessageStreamResponse: (
streamOptions?: FullStreamToUIMessageStreamOptions,
) => {
const opts: FullStreamToUIMessageStreamOptions = {
sendReasoning: streamOptions?.sendReasoning ?? true,
sendSources: streamOptions?.sendSources ?? true,
sendStart: streamOptions?.sendStart ?? true,
sendFinish: streamOptions?.sendFinish ?? true,
onError: streamOptions?.onError ?? ((e: unknown) => String(e)),
types: streamOptions?.types,
};

const fullStream = getGuardrailAwareFullStream();

const uiStream = createUIMessageStream({
execute: async ({ writer }) => {
for await (const part of fullStream) {
const converted = convertFullStreamPartToUIMessageChunk(part, opts);
if (converted) {
// Cast needed: UIMessageChunkWithMetadata extends base UI chunk with subagent metadata
writer.write(converted as Parameters<typeof writer.write>[0]);
}
}
},
onError: opts.onError,
});

return createUIMessageStreamResponse({ stream: uiStream });
},
context: oc.context,
get feedback() {
return feedbackValue;
Expand Down
Loading