Skip to content
194 changes: 194 additions & 0 deletions packages/core/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,128 @@ const callToolParameters = z.object({
// Types
// ============================================================================

// Subagent metadata keys from VoltAgentTextStreamPart
const SUBAGENT_METADATA_KEYS = [
"subAgentId",
"subAgentName",
"executingAgentId",
"executingAgentName",
"parentAgentId",
"parentAgentName",
"agentPath",
] as const;

/**
* Extracts subagent metadata fields from a stream part
*/
function extractSubagentMetadata(
part: VoltAgentTextStreamPart,
): Pick<VoltAgentTextStreamPart, (typeof SUBAGENT_METADATA_KEYS)[number]> {
const metadata: Record<string, unknown> = {};
for (const key of SUBAGENT_METADATA_KEYS) {
if (part[key] != null) {
metadata[key] = part[key];
}
}
return metadata as Pick<VoltAgentTextStreamPart, (typeof SUBAGENT_METADATA_KEYS)[number]>;
}

/**
* Converts a fullStream part to UIMessageChunk format with subagent metadata
* @internal Exported for testing purposes
*/
export function convertFullStreamPartToUIMessageChunk(
part: VoltAgentTextStreamPart,
options: FullStreamToUIMessageStreamOptions,
): Record<string, unknown> | undefined {
// Check if this event type should be included based on the types filter
if (options.types && options.types.length > 0 && !options.types.includes(part.type)) {
return undefined;
}

const meta = extractSubagentMetadata(part);

switch (part.type) {
case "text-delta":
return { type: "text-delta", id: part.id, delta: part.text, ...meta };

case "reasoning-delta":
if (!options.sendReasoning) return undefined;
return { type: "reasoning-delta", id: part.id, delta: part.text, ...meta };

case "source":
if (!options.sendSources) return undefined;
return { ...part, ...meta };

case "tool-call":
return {
type: "tool-input-available",
toolCallId: part.toolCallId,
toolName: part.toolName,
input: "input" in part ? part.input : (part as Record<string, unknown>).args,
...meta,
};

case "tool-result":
return {
type: "tool-output-available",
toolCallId: part.toolCallId,
output: "output" in part ? part.output : (part as Record<string, unknown>).result,
...meta,
};

case "tool-input-start":
return {
type: "tool-input-start",
toolCallId: part.id,
toolName: part.toolName,
...meta,
};

case "tool-input-delta":
return {
type: "tool-input-delta",
toolCallId: part.id,
inputTextDelta: part.delta,
...meta,
};

case "tool-error":
return {
type: "tool-output-error",
toolCallId: part.toolCallId,
errorText: String(part.error),
...meta,
};

case "start-step":
if (!options.sendStart) return undefined;
return { type: "start-step", ...meta };

case "finish-step":
if (!options.sendFinish) return undefined;
return { type: "finish-step", finishReason: part.finishReason, usage: part.usage, ...meta };

case "start":
if (!options.sendStart) return undefined;
return { type: "start", ...meta };

case "finish":
if (!options.sendFinish) return undefined;
return { type: "finish", finishReason: part.finishReason, usage: part.totalUsage, ...meta };

case "error":
return {
type: "error",
error: options.onError ? options.onError(part.error) : String(part.error),
...meta,
};

default:
return undefined;
}
}

export type OutputSpec = Output.Output<unknown, unknown>;
type OutputValue<OUTPUT extends OutputSpec> = InferGenerateOutput<OUTPUT>;

Expand Down Expand Up @@ -271,6 +393,24 @@ function sanitizeConversationTitle(text: string, maxLength: number): string {
/**
* Extended StreamTextResult that includes context
*/
/**
* Options for fullStreamToUIMessageStream conversion
*/
export type FullStreamToUIMessageStreamOptions = {
/** Include reasoning/thinking content in the stream */
sendReasoning?: boolean;
/** Include source annotations in the stream */
sendSources?: boolean;
/** Send start events */
sendStart?: boolean;
/** Send finish events */
sendFinish?: boolean;
/** Error handler */
onError?: (error: unknown) => string;
/** Filter to only include specific event types (if not set, includes all) */
types?: string[];
};

export type StreamTextResultWithContext<
TOOLS extends ToolSet = Record<string, any>,
OUTPUT = unknown,
Expand All @@ -288,6 +428,9 @@ export type StreamTextResultWithContext<
pipeUIMessageStreamToResponse: AIStreamTextResult<TOOLS, any>["pipeUIMessageStreamToResponse"];
pipeTextStreamToResponse: AIStreamTextResult<TOOLS, any>["pipeTextStreamToResponse"];
toTextStreamResponse: AIStreamTextResult<TOOLS, any>["toTextStreamResponse"];
// Convert fullStream to UIMessageStream with subagent metadata preserved
fullStreamToUIMessageStream: (options?: FullStreamToUIMessageStreamOptions) => AsyncIterable<any>;
fullStreamToUIMessageStreamResponse: (options?: FullStreamToUIMessageStreamOptions) => Response;
// Additional context field
context: Map<string | symbol, unknown>;
// Feedback metadata for the trace, if enabled
Expand Down Expand Up @@ -2040,6 +2183,57 @@ export class Agent {
...(init ?? {}),
});
},
fullStreamToUIMessageStream: (streamOptions?: FullStreamToUIMessageStreamOptions) => {
const opts: FullStreamToUIMessageStreamOptions = {
sendReasoning: streamOptions?.sendReasoning ?? true,
sendSources: streamOptions?.sendSources ?? true,
sendStart: streamOptions?.sendStart ?? true,
sendFinish: streamOptions?.sendFinish ?? true,
onError: streamOptions?.onError ?? ((e: unknown) => String(e)),
types: streamOptions?.types,
};

const fullStream = getGuardrailAwareFullStream();

async function* generateUIStream() {
for await (const part of fullStream) {
const converted = convertFullStreamPartToUIMessageChunk(part, opts);
if (converted) {
yield converted;
}
}
}

return generateUIStream();
},
fullStreamToUIMessageStreamResponse: (
streamOptions?: FullStreamToUIMessageStreamOptions,
) => {
const opts: FullStreamToUIMessageStreamOptions = {
sendReasoning: streamOptions?.sendReasoning ?? true,
sendSources: streamOptions?.sendSources ?? true,
sendStart: streamOptions?.sendStart ?? true,
sendFinish: streamOptions?.sendFinish ?? true,
onError: streamOptions?.onError ?? ((e: unknown) => String(e)),
types: streamOptions?.types,
};

const fullStream = getGuardrailAwareFullStream();

const uiStream = createUIMessageStream({
execute: async ({ writer }) => {
for await (const part of fullStream) {
const converted = convertFullStreamPartToUIMessageChunk(part, opts);
if (converted) {
writer.write(converted as any);
}
}
},
onError: opts.onError,
});

return createUIMessageStreamResponse({ stream: uiStream });
},
context: oc.context,
get feedback() {
return feedbackValue;
Expand Down
Loading