Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/every-icons-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@mastra/core': patch
---

Fixed streamed supervisor narration being included in the final answer text.
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,101 @@ describe('Supervisor Pattern - Tool approval propagation', () => {
expect(mockFindUser).toHaveBeenCalled();
});

it('should stream sub-agent text without including it in the supervisor final text', async () => {
const subAgent = new Agent({
id: 'narration-sub-agent',
name: 'Narration Sub Agent',
description: 'Streams text back to the supervisor.',
instructions: 'You answer delegated prompts.',
model: new MockLanguageModelV2({
doStream: async () => ({
rawCall: { rawPrompt: null, rawSettings: {} },
warnings: [],
stream: convertArrayToReadableStream([
{ type: 'stream-start', warnings: [] },
{ type: 'response-metadata', id: 'sub-id-0', modelId: 'mock-model-id', timestamp: new Date(0) },
{ type: 'text-start', id: 'sub-text-1' },
{ type: 'text-delta', id: 'sub-text-1', delta: 'Sub-agent streamed answer.' },
{ type: 'text-end', id: 'sub-text-1' },
{
type: 'finish',
finishReason: 'stop',
usage: { inputTokens: 5, outputTokens: 10, totalTokens: 15 },
},
]),
}),
}),
});

let supervisorCallCount = 0;
const supervisorAgent = new Agent({
id: 'narration-supervisor',
name: 'Narration Supervisor',
instructions: 'You orchestrate sub-agents.',
model: new MockLanguageModelV2({
doStream: async () => {
supervisorCallCount++;
if (supervisorCallCount === 1) {
return {
rawCall: { rawPrompt: null, rawSettings: {} },
warnings: [],
stream: convertArrayToReadableStream([
{ type: 'stream-start', warnings: [] },
{ type: 'response-metadata', id: 'supervisor-id-0', modelId: 'mock-model-id', timestamp: new Date(0) },
{ type: 'text-start', id: 'supervisor-narration-1' },
{ type: 'text-delta', id: 'supervisor-narration-1', delta: 'Supervisor narration before delegation.' },
{ type: 'text-end', id: 'supervisor-narration-1' },
{
type: 'tool-call',
toolCallId: 'supervisor-call-1',
toolName: 'agent-narrationSubAgent',
input: JSON.stringify({ prompt: 'answer from sub-agent' }),
},
{
type: 'finish',
finishReason: 'tool-calls',
usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 },
},
]),
};
}

return {
rawCall: { rawPrompt: null, rawSettings: {} },
warnings: [],
stream: convertArrayToReadableStream([
{ type: 'stream-start', warnings: [] },
{ type: 'response-metadata', id: 'supervisor-id-1', modelId: 'mock-model-id', timestamp: new Date(0) },
{ type: 'text-start', id: 'supervisor-text-1' },
{ type: 'text-delta', id: 'supervisor-text-1', delta: 'Supervisor final answer.' },
{ type: 'text-end', id: 'supervisor-text-1' },
{
type: 'finish',
finishReason: 'stop',
usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 },
},
]),
};
},
}),
agents: { narrationSubAgent: subAgent },
memory: new MockMemory(),
});

const stream = await supervisorAgent.stream('Delegate and answer', { maxSteps: 3 });
const streamedText: string[] = [];

for await (const chunk of stream.fullStream) {
if (chunk.type === 'text-delta') {
streamedText.push(chunk.payload.text);
}
}

expect(streamedText).toContain('Supervisor narration before delegation.');
expect(streamedText).toContain('Supervisor final answer.');
await expect(stream.text).resolves.toBe('Supervisor final answer.');
});

it('should propagate tool approval decline from sub-agent through supervisor stream', async () => {
const mockFindUser = vi.fn().mockResolvedValue({ name: 'Bob', email: 'bob@example.com' });

Expand Down
26 changes: 24 additions & 2 deletions packages/core/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4949,7 +4949,18 @@ export class Agent<
resumeSchema = chunk.data.resumeSchema;
}
} else {
await context.writer.write(chunk);
const chunkFromSubAgent =
chunk.type === 'text-start' || chunk.type === 'text-delta' || chunk.type === 'text-end'
? {
...chunk,
metadata: {
...chunk.metadata,
subAgentId: agent.id,
__mastraExcludeFromOutputText: true,
},
}
: chunk;
await context.writer.write(chunkFromSubAgent);
if (chunk.type === 'tool-call-approval') {
suspendedPayload = {};
requireToolApproval = true;
Expand Down Expand Up @@ -5057,7 +5068,18 @@ export class Agent<
// Write data chunks directly to original stream to bubble up
await context.writer.custom(chunk as any);
} else {
await context.writer.write(chunk);
const chunkFromSubAgent =
chunk.type === 'text-start' || chunk.type === 'text-delta' || chunk.type === 'text-end'
? {
...chunk,
metadata: {
...chunk.metadata,
subAgentId: agent.id,
__mastraExcludeFromOutputText: true,
},
}
: chunk;
await context.writer.write(chunkFromSubAgent);
}
}

Expand Down
22 changes: 16 additions & 6 deletions packages/core/src/stream/base/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ export class MastraModelOutput<OUTPUT = undefined> extends MastraBase {
#emitter = new EventEmitter();
#bufferedSteps: LLMStepResult<OUTPUT>[] = [];
#bufferedReasoningDetails: Record<string, LLMStepResult<OUTPUT>['reasoning'][number]> = {};
#bufferedTextStepStartIndex = 0;
#bufferedByStep: LLMStepResult<OUTPUT> = {
text: '',
reasoning: [],
Expand Down Expand Up @@ -499,12 +500,14 @@ export class MastraModelOutput<OUTPUT = undefined> extends MastraBase {
self.#bufferedByStep.sources.push(chunk);
break;
case 'text-delta':
self.#bufferedText.push(chunk.payload.text);
self.#bufferedByStep.text += chunk.payload.text;
if (chunk.payload.id) {
const ary = self.#bufferedTextChunks[chunk.payload.id] ?? [];
ary.push(chunk.payload.text);
self.#bufferedTextChunks[chunk.payload.id] = ary;
if (!chunk.metadata?.__mastraExcludeFromOutputText) {
self.#bufferedText.push(chunk.payload.text);
self.#bufferedByStep.text += chunk.payload.text;
if (chunk.payload.id) {
const ary = self.#bufferedTextChunks[chunk.payload.id] ?? [];
ary.push(chunk.payload.text);
self.#bufferedTextChunks[chunk.payload.id] = ary;
}
}
break;
case 'tool-call-input-streaming-start':
Expand Down Expand Up @@ -734,6 +737,11 @@ export class MastraModelOutput<OUTPUT = undefined> extends MastraBase {

self.#bufferedSteps.push(stepResult);

if (chunk.payload.stepResult.reason === 'tool-calls') {
self.#bufferedText.splice(self.#bufferedTextStepStartIndex);
}
self.#bufferedTextStepStartIndex = self.#bufferedText.length;

Comment on lines +740 to +744

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '--- packages/core/AGENTS.md ---\n'
cat packages/core/AGENTS.md

printf '\n--- outline: packages/core/src/stream/base/output.ts ---\n'
ast-grep outline packages/core/src/stream/base/output.ts --view expanded

printf '\n--- relevant slices ---\n'
sed -n '680,780p' packages/core/src/stream/base/output.ts

printf '\n--- search for getFullOutput / _getImmediateText / bufferedByStep ---\n'
rg -n "getFullOutput|_getImmediateText|bufferedByStep|bufferedTextStepStartIndex|tool-calls|stepResult.text|steps\.map\(step => step\.text\)" packages/core/src/stream/base/output.ts packages/core/src -S

Repository: mastra-ai/mastra

Length of output: 245


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo '--- packages/core/AGENTS.md ---'
cat packages/core/AGENTS.md

echo
echo '--- outline: packages/core/src/stream/base/output.ts ---'
ast-grep outline packages/core/src/stream/base/output.ts --view expanded

echo
echo '--- relevant slices ---'
sed -n '680,780p' packages/core/src/stream/base/output.ts

echo
echo '--- search for getFullOutput / _getImmediateText / bufferedByStep ---'
rg -n "getFullOutput|_getImmediateText|bufferedByStep|bufferedTextStepStartIndex|tool-calls|stepResult.text|steps\\.map\\(step => step\\.text\\)" packages/core/src/stream/base/output.ts packages/core/src -S

Repository: mastra-ai/mastra

Length of output: 50372


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo '--- text getter / full output / immediate text ---'
sed -n '1210,1465p' packages/core/src/stream/base/output.ts
echo
sed -n '1598,1635p' packages/core/src/stream/base/output.ts

echo
echo '--- focused tests around getFullOutput text shape ---'
sed -n '200,260p' packages/core/src/stream/base/output.test.ts
echo
sed -n '860,960p' packages/core/src/stream/base/output.test.ts

Repository: mastra-ai/mastra

Length of output: 14226


Keep the tool-call truncation aligned across outputs

splice() only trims #bufferedText, but stepResult.text is already copied into #bufferedSteps, so getFullOutput().text still includes the pre-tool narration while stream.text drops it. The reason === 'tool-calls' check also applies to every tool-call step, not just delegation, so this changes output for any agent that speaks before calling a regular tool.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/stream/base/output.ts` around lines 740 - 744, The
tool-call truncation in the output buffering logic is inconsistent because
`#bufferedText` is trimmed while the already-copied `stepResult.text` in
`#bufferedSteps` is left intact, so `getFullOutput().text` and `stream.text`
diverge. Update the handling in `Output` around `#bufferedTextStepStartIndex`
and the `reason === 'tool-calls'` branch so both buffered text and step output
are truncated consistently when a tool-call step occurs, and ensure this
behavior only applies to the intended delegation case rather than every regular
tool-call step.

self.#bufferedByStep = {
text: '',
reasoning: [],
Expand Down Expand Up @@ -1796,6 +1804,7 @@ export class MastraModelOutput<OUTPUT = undefined> extends MastraBase {
bufferedReasoningDetails: this.#bufferedReasoningDetails,
bufferedByStep: this.#bufferedByStep,
bufferedText: this.#bufferedText,
bufferedTextStepStartIndex: this.#bufferedTextStepStartIndex,
bufferedTextChunks: this.#bufferedTextChunks,
bufferedSources: this.#bufferedSources,
bufferedReasoning: this.#bufferedReasoning,
Expand All @@ -1820,6 +1829,7 @@ export class MastraModelOutput<OUTPUT = undefined> extends MastraBase {
this.#bufferedReasoningDetails = state.bufferedReasoningDetails;
this.#bufferedByStep = state.bufferedByStep;
this.#bufferedText = state.bufferedText;
this.#bufferedTextStepStartIndex = state.bufferedTextStepStartIndex ?? this.#bufferedText.length;
this.#bufferedTextChunks = state.bufferedTextChunks;
this.#bufferedSources = state.bufferedSources;
this.#bufferedReasoning = state.bufferedReasoning;
Expand Down
Loading