diff --git a/.changeset/every-icons-rest.md b/.changeset/every-icons-rest.md new file mode 100644 index 00000000000..7ca3f7df8f2 --- /dev/null +++ b/.changeset/every-icons-rest.md @@ -0,0 +1,5 @@ +--- +'@mastra/core': patch +--- + +Fixed streamed supervisor narration being included in the final answer text. diff --git a/packages/core/src/agent/__tests__/supervisor-integration.test.ts b/packages/core/src/agent/__tests__/supervisor-integration.test.ts index 44827f058c5..53b3ed6b977 100644 --- a/packages/core/src/agent/__tests__/supervisor-integration.test.ts +++ b/packages/core/src/agent/__tests__/supervisor-integration.test.ts @@ -891,6 +891,101 @@ describe('Supervisor Pattern - Tool approval propagation', () => { expect(mockFindUser).toHaveBeenCalled(); }); + it('should stream sub-agent text without including it in the supervisor final text', async () => { + const subAgent = new Agent({ + id: 'narration-sub-agent', + name: 'Narration Sub Agent', + description: 'Streams text back to the supervisor.', + instructions: 'You answer delegated prompts.', + model: new MockLanguageModelV2({ + doStream: async () => ({ + rawCall: { rawPrompt: null, rawSettings: {} }, + warnings: [], + stream: convertArrayToReadableStream([ + { type: 'stream-start', warnings: [] }, + { type: 'response-metadata', id: 'sub-id-0', modelId: 'mock-model-id', timestamp: new Date(0) }, + { type: 'text-start', id: 'sub-text-1' }, + { type: 'text-delta', id: 'sub-text-1', delta: 'Sub-agent streamed answer.' }, + { type: 'text-end', id: 'sub-text-1' }, + { + type: 'finish', + finishReason: 'stop', + usage: { inputTokens: 5, outputTokens: 10, totalTokens: 15 }, + }, + ]), + }), + }), + }); + + let supervisorCallCount = 0; + const supervisorAgent = new Agent({ + id: 'narration-supervisor', + name: 'Narration Supervisor', + instructions: 'You orchestrate sub-agents.', + model: new MockLanguageModelV2({ + doStream: async () => { + supervisorCallCount++; + if (supervisorCallCount === 1) { + return { + rawCall: { rawPrompt: null, rawSettings: {} }, + warnings: [], + stream: convertArrayToReadableStream([ + { type: 'stream-start', warnings: [] }, + { type: 'response-metadata', id: 'supervisor-id-0', modelId: 'mock-model-id', timestamp: new Date(0) }, + { type: 'text-start', id: 'supervisor-narration-1' }, + { type: 'text-delta', id: 'supervisor-narration-1', delta: 'Supervisor narration before delegation.' }, + { type: 'text-end', id: 'supervisor-narration-1' }, + { + type: 'tool-call', + toolCallId: 'supervisor-call-1', + toolName: 'agent-narrationSubAgent', + input: JSON.stringify({ prompt: 'answer from sub-agent' }), + }, + { + type: 'finish', + finishReason: 'tool-calls', + usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 }, + }, + ]), + }; + } + + return { + rawCall: { rawPrompt: null, rawSettings: {} }, + warnings: [], + stream: convertArrayToReadableStream([ + { type: 'stream-start', warnings: [] }, + { type: 'response-metadata', id: 'supervisor-id-1', modelId: 'mock-model-id', timestamp: new Date(0) }, + { type: 'text-start', id: 'supervisor-text-1' }, + { type: 'text-delta', id: 'supervisor-text-1', delta: 'Supervisor final answer.' }, + { type: 'text-end', id: 'supervisor-text-1' }, + { + type: 'finish', + finishReason: 'stop', + usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 }, + }, + ]), + }; + }, + }), + agents: { narrationSubAgent: subAgent }, + memory: new MockMemory(), + }); + + const stream = await supervisorAgent.stream('Delegate and answer', { maxSteps: 3 }); + const streamedText: string[] = []; + + for await (const chunk of stream.fullStream) { + if (chunk.type === 'text-delta') { + streamedText.push(chunk.payload.text); + } + } + + expect(streamedText).toContain('Supervisor narration before delegation.'); + expect(streamedText).toContain('Supervisor final answer.'); + await expect(stream.text).resolves.toBe('Supervisor final answer.'); + }); + it('should propagate tool approval decline from sub-agent through supervisor stream', async () => { const mockFindUser = vi.fn().mockResolvedValue({ name: 'Bob', email: 'bob@example.com' }); diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index 9993e6e7d01..2bfa95357c7 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -5017,7 +5017,18 @@ export class Agent< resumeSchema = chunk.data.resumeSchema; } } else { - await context.writer.write(chunk); + const chunkFromSubAgent = + chunk.type === 'text-start' || chunk.type === 'text-delta' || chunk.type === 'text-end' + ? { + ...chunk, + metadata: { + ...chunk.metadata, + subAgentId: agent.id, + __mastraExcludeFromOutputText: true, + }, + } + : chunk; + await context.writer.write(chunkFromSubAgent); if (chunk.type === 'tool-call-approval') { suspendedPayload = {}; requireToolApproval = true; @@ -5125,7 +5136,18 @@ export class Agent< // Write data chunks directly to original stream to bubble up await context.writer.custom(chunk as any); } else { - await context.writer.write(chunk); + const chunkFromSubAgent = + chunk.type === 'text-start' || chunk.type === 'text-delta' || chunk.type === 'text-end' + ? { + ...chunk, + metadata: { + ...chunk.metadata, + subAgentId: agent.id, + __mastraExcludeFromOutputText: true, + }, + } + : chunk; + await context.writer.write(chunkFromSubAgent); } } diff --git a/packages/core/src/stream/base/output.ts b/packages/core/src/stream/base/output.ts index 2eab6d8537b..3aa5eceeea4 100644 --- a/packages/core/src/stream/base/output.ts +++ b/packages/core/src/stream/base/output.ts @@ -153,6 +153,7 @@ export class MastraModelOutput extends MastraBase { #emitter = new EventEmitter(); #bufferedSteps: LLMStepResult[] = []; #bufferedReasoningDetails: Record['reasoning'][number]> = {}; + #bufferedTextStepStartIndex = 0; #bufferedByStep: LLMStepResult = { text: '', reasoning: [], @@ -499,12 +500,14 @@ export class MastraModelOutput extends MastraBase { self.#bufferedByStep.sources.push(chunk); break; case 'text-delta': - self.#bufferedText.push(chunk.payload.text); - self.#bufferedByStep.text += chunk.payload.text; - if (chunk.payload.id) { - const ary = self.#bufferedTextChunks[chunk.payload.id] ?? []; - ary.push(chunk.payload.text); - self.#bufferedTextChunks[chunk.payload.id] = ary; + if (!chunk.metadata?.__mastraExcludeFromOutputText) { + self.#bufferedText.push(chunk.payload.text); + self.#bufferedByStep.text += chunk.payload.text; + if (chunk.payload.id) { + const ary = self.#bufferedTextChunks[chunk.payload.id] ?? []; + ary.push(chunk.payload.text); + self.#bufferedTextChunks[chunk.payload.id] = ary; + } } break; case 'tool-call-input-streaming-start': @@ -734,6 +737,11 @@ export class MastraModelOutput extends MastraBase { self.#bufferedSteps.push(stepResult); + if (chunk.payload.stepResult.reason === 'tool-calls') { + self.#bufferedText.splice(self.#bufferedTextStepStartIndex); + } + self.#bufferedTextStepStartIndex = self.#bufferedText.length; + self.#bufferedByStep = { text: '', reasoning: [], @@ -1796,6 +1804,7 @@ export class MastraModelOutput extends MastraBase { bufferedReasoningDetails: this.#bufferedReasoningDetails, bufferedByStep: this.#bufferedByStep, bufferedText: this.#bufferedText, + bufferedTextStepStartIndex: this.#bufferedTextStepStartIndex, bufferedTextChunks: this.#bufferedTextChunks, bufferedSources: this.#bufferedSources, bufferedReasoning: this.#bufferedReasoning, @@ -1820,6 +1829,7 @@ export class MastraModelOutput extends MastraBase { this.#bufferedReasoningDetails = state.bufferedReasoningDetails; this.#bufferedByStep = state.bufferedByStep; this.#bufferedText = state.bufferedText; + this.#bufferedTextStepStartIndex = state.bufferedTextStepStartIndex ?? this.#bufferedText.length; this.#bufferedTextChunks = state.bufferedTextChunks; this.#bufferedSources = state.bufferedSources; this.#bufferedReasoning = state.bufferedReasoning;