Skip to content

Commit cb72d20

Browse files
NikAiyerclaude
andauthored
fix: apply output processors to messages saved during network execution (#12346)
## Description When using `agent.network()`, output processors configured on the agent were not being applied to messages saved to storage. This broke functionality like trace ID injection for feedback attribution (e.g., Braintrust integration). **Root cause**: The network loop saves messages directly via `memory.saveMessages()` at ~10 locations, bypassing the output processor pipeline that normally runs during `agent.generate()`. **Solution**: - Created a `saveMessagesWithProcessors()` helper function that applies output processors via `ProcessorRunner` before saving - Created a `ProcessorRunner` instance in `createNetworkLoop()` using the agent's configured output processors - Replaced all `memory.saveMessages()` calls with the helper function This ensures output processors are applied to all messages saved during network execution including: - Router direct responses - Completion check results - Agent/workflow execution results - Tool execution results ## Related Issue(s) Fixes #12300 ## Type of Change - [x] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Test update ## Checklist - [ ] I have made corresponding changes to the documentation (if applicable) - [x] I have added tests that prove my fix is effective or that my feature works 🤖 Generated with [Claude Code](https://claude.ai/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Output processors are now correctly applied to messages saved during network execution, ensuring configured processors (e.g., trace injectors) run on all persisted messages. * **Tests** * Added test coverage verifying output processors are applied to saved messages in network flows. * **Chores** * Added a changeset entry to publish the patch fixing processor application. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 95df5c1 commit cb72d20

File tree

3 files changed

+290
-49
lines changed

3 files changed

+290
-49
lines changed

.changeset/pink-sloths-press.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@mastra/core': patch
3+
---
4+
5+
Fixed output processors not being applied to messages saved during network execution. When using agent.network(), configured output processors (like TraceIdInjector for feedback attribution) are now correctly applied to all messages before they are saved to storage.

packages/core/src/agent/agent-network.test.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5468,3 +5468,156 @@ describe('Agent - network - message history transfer to sub-agents', () => {
54685468
expect(secondCallPrompt).not.toContain('selectionReason');
54695469
});
54705470
});
5471+
5472+
describe('Agent - network - output processors', () => {
5473+
it('should apply output processors to messages saved during network execution', async () => {
5474+
// This test verifies that output processors (like TraceIdInjector for Braintrust)
5475+
// are applied to messages saved during network execution.
5476+
// Issue: https://github.com/mastra-ai/mastra/issues/12300
5477+
5478+
const savedMessages: any[] = [];
5479+
const memory = new MockMemory();
5480+
5481+
// Intercept saveMessages to capture all saved messages
5482+
const originalSaveMessages = memory.saveMessages.bind(memory);
5483+
memory.saveMessages = async (params: any) => {
5484+
savedMessages.push(...params.messages);
5485+
return originalSaveMessages(params);
5486+
};
5487+
5488+
// Create an output processor that adds traceId to assistant messages
5489+
// (similar to the TraceIdInjector from the issue)
5490+
const traceIdProcessor = {
5491+
id: 'trace-id-injector',
5492+
name: 'Trace ID Injector',
5493+
processOutputResult: ({ messages }: { messages: any[] }) => {
5494+
return messages.map((msg: any) => {
5495+
if (msg.role === 'assistant') {
5496+
return {
5497+
...msg,
5498+
content: {
5499+
...msg.content,
5500+
metadata: {
5501+
...msg.content?.metadata,
5502+
traceId: 'test-trace-id-12300',
5503+
},
5504+
},
5505+
};
5506+
}
5507+
return msg;
5508+
});
5509+
},
5510+
};
5511+
5512+
// Create a simple sub-agent
5513+
const subAgentMockModel = new MockLanguageModelV2({
5514+
doGenerate: async () => ({
5515+
rawCall: { rawPrompt: null, rawSettings: {} },
5516+
finishReason: 'stop',
5517+
usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 },
5518+
content: [{ type: 'text', text: 'Sub-agent response' }],
5519+
warnings: [],
5520+
}),
5521+
doStream: async () => ({
5522+
stream: convertArrayToReadableStream([
5523+
{ type: 'stream-start', warnings: [] },
5524+
{ type: 'response-metadata', id: 'id-0', modelId: 'mock-model-id', timestamp: new Date(0) },
5525+
{ type: 'text-delta', id: 'id-0', delta: 'Sub-agent response' },
5526+
{ type: 'finish', finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 } },
5527+
]),
5528+
}),
5529+
});
5530+
5531+
const subAgent = new Agent({
5532+
id: 'subAgent',
5533+
name: 'Sub Agent',
5534+
description: 'A sub-agent for testing',
5535+
instructions: 'Do the task.',
5536+
model: subAgentMockModel,
5537+
});
5538+
5539+
// Routing agent selects sub-agent, then marks complete
5540+
const routingSelectAgent = JSON.stringify({
5541+
primitiveId: 'subAgent',
5542+
primitiveType: 'agent',
5543+
prompt: 'Do the task',
5544+
selectionReason: 'Sub-agent can handle this',
5545+
});
5546+
5547+
const completionResponse = JSON.stringify({
5548+
isComplete: true,
5549+
finalResult: 'Task completed',
5550+
completionReason: 'Sub-agent completed the request',
5551+
});
5552+
5553+
let routingCallCount = 0;
5554+
const routingMockModel = new MockLanguageModelV2({
5555+
doGenerate: async () => {
5556+
routingCallCount++;
5557+
const text = routingCallCount === 1 ? routingSelectAgent : completionResponse;
5558+
return {
5559+
rawCall: { rawPrompt: null, rawSettings: {} },
5560+
finishReason: 'stop',
5561+
usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 },
5562+
content: [{ type: 'text', text }],
5563+
warnings: [],
5564+
};
5565+
},
5566+
doStream: async () => {
5567+
routingCallCount++;
5568+
const text = routingCallCount === 1 ? routingSelectAgent : completionResponse;
5569+
return {
5570+
stream: convertArrayToReadableStream([
5571+
{ type: 'stream-start', warnings: [] },
5572+
{ type: 'response-metadata', id: 'id-0', modelId: 'mock-model-id', timestamp: new Date(0) },
5573+
{ type: 'text-delta', id: 'id-0', delta: text },
5574+
{ type: 'finish', finishReason: 'stop', usage: { inputTokens: 10, outputTokens: 20, totalTokens: 30 } },
5575+
]),
5576+
};
5577+
},
5578+
});
5579+
5580+
// Create network agent with the output processor
5581+
const networkAgent = new Agent({
5582+
id: 'network-agent-with-processor',
5583+
name: 'Network Agent with Output Processor',
5584+
instructions: 'Delegate tasks to sub-agents.',
5585+
model: routingMockModel,
5586+
agents: { subAgent },
5587+
memory,
5588+
outputProcessors: [traceIdProcessor],
5589+
});
5590+
5591+
const anStream = await networkAgent.network('Do the task', {
5592+
memory: {
5593+
thread: 'test-thread-output-processors',
5594+
resource: 'test-resource-output-processors',
5595+
},
5596+
});
5597+
5598+
// Consume the stream
5599+
for await (const _chunk of anStream) {
5600+
// Process stream
5601+
}
5602+
5603+
// Verify that at least some messages were saved
5604+
expect(savedMessages.length).toBeGreaterThan(0);
5605+
5606+
// Find assistant messages saved during network execution
5607+
const assistantMessages = savedMessages.filter((msg: any) => msg.role === 'assistant');
5608+
expect(assistantMessages.length).toBeGreaterThan(0);
5609+
5610+
// The key assertion: output processors should have been applied,
5611+
// so assistant messages should have the traceId in their metadata
5612+
const messagesWithTraceId = assistantMessages.filter(
5613+
(msg: any) => msg.content?.metadata?.traceId === 'test-trace-id-12300',
5614+
);
5615+
5616+
// This assertion will FAIL if output processors are not applied to saved messages
5617+
expect(
5618+
messagesWithTraceId.length,
5619+
'Output processors should be applied to assistant messages saved during network execution. ' +
5620+
'Expected at least one assistant message to have traceId in metadata.',
5621+
).toBeGreaterThan(0);
5622+
});
5623+
});

0 commit comments

Comments
 (0)