Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,7 @@ export async function runAgent(
},
);

const result = await processEventStream(
ctx,
eventStream,
itemIndex,
options.returnIntermediateSteps,
memory,
input,
);
const result = await processEventStream(ctx, eventStream, itemIndex);

// If result contains tool calls, build the request object like the normal flow
if (result.toolCalls && result.toolCalls.length > 0) {
Expand All @@ -91,6 +84,14 @@ export async function runAgent(
metadata: buildResponseMetadata(response, itemIndex),
};
}
// Save conversation to memory including any tool call context
if (memory && input && result?.output) {
await saveToMemory(input, result.output, memory, steps);
}

if (options.returnIntermediateSteps && steps.length > 0) {
result.intermediateSteps = steps;
}

return result;
} else {
Expand All @@ -105,21 +106,7 @@ export async function runAgent(
if ('returnValues' in modelResponse) {
// Save conversation to memory including any tool call context
if (memory && input && modelResponse.returnValues.output) {
// If there were tool calls in this conversation, include them in the context
let fullOutput = modelResponse.returnValues.output as string;

if (steps.length > 0) {
// Include tool call information in the conversation context
const toolContext = steps
.map(
(step) =>
`Tool: ${step.action.tool}, Input: ${JSON.stringify(step.action.toolInput)}, Result: ${step.observation}`,
)
.join('; ');
fullOutput = `[Used tools: ${toolContext}] ${fullOutput}`;
}

await saveToMemory(input, fullOutput, memory);
await saveToMemory(input, modelResponse.returnValues.output, memory, steps);
}
// Include intermediate steps if requested
const result = { ...modelResponse.returnValues };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
export { createEngineRequests } from './createEngineRequests';
export { buildSteps } from './buildSteps';
export { processEventStream } from './processEventStream';
export { loadMemory, saveToMemory, saveToolResultsToMemory } from './memoryManagement';
export { loadMemory, saveToMemory, buildToolContext } from './memoryManagement';
export type {
ToolCallRequest,
ToolCallData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ import type { BaseChatMemory } from 'langchain/memory';

import type { ToolCallData } from './types';

/**
* Builds a formatted string representation of tool calls for memory storage.
* This creates a consistent format that can be used across both streaming and non-streaming modes.
*
* @param steps - Array of tool call data with actions and observations
* @returns Formatted string of tool calls separated by semicolons
*
* @example
* ```typescript
* const context = buildToolContext([{
* action: { tool: 'calculator', toolInput: { expression: '2+2' }, ... },
* observation: '4'
* }]);
* // Returns: "Tool: calculator, Input: {"expression":"2+2"}, Result: 4"
* ```
*/
export function buildToolContext(steps: ToolCallData[]): string {
return steps
.map(
(step) =>
`Tool: ${step.action.tool}, Input: ${JSON.stringify(step.action.toolInput)}, Result: ${step.observation}`,
)
.join('; ');
}

/**
* Loads chat history from memory and optionally trims it to fit within token limits.
*
Expand Down Expand Up @@ -64,50 +89,16 @@ export async function saveToMemory(
input: string,
output: string,
memory?: BaseChatMemory,
steps?: ToolCallData[],
): Promise<void> {
if (!output || !memory) {
return;
}

await memory.saveContext({ input }, { output });
}

/**
* Saves tool call results to memory as formatted messages.
*
* This preserves the full conversation including tool interactions,
* which is important for agents that need to see their tool usage history.
*
* @param memory - The memory instance to save to
* @param input - The user input that triggered the tool calls
* @param toolResults - Array of tool call results to save
*
* @example
* ```typescript
* await saveToolResultsToMemory(memory, 'Calculate 2+2', [{
* action: {
* tool: 'calculator',
* toolInput: { expression: '2+2' },
* log: 'Using calculator',
* toolCallId: 'call_123',
* type: 'tool_call'
* },
* observation: '4'
* }]);
* ```
*/
export async function saveToolResultsToMemory(
input: string,
toolResults: ToolCallData[],
memory?: BaseChatMemory,
): Promise<void> {
if (!memory || !toolResults.length) {
return;
let fullOutput = output;
if (steps && steps.length > 0) {
const toolContext = buildToolContext(steps);
fullOutput = `[Used tools: ${toolContext}] ${fullOutput}`;
}

// Save each tool call as a formatted message
for (const result of toolResults) {
const toolMessage = `Tool: ${result.action.tool}, Input: ${JSON.stringify(result.action.toolInput)}, Result: ${result.observation}`;
await memory.saveContext({ input }, { output: toolMessage });
}
await memory.saveContext({ input }, { output: fullOutput });
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages';
import type { BaseChatMemory } from 'langchain/memory';
import type { IExecuteFunctions } from 'n8n-workflow';

import { saveToMemory, saveToolResultsToMemory } from './memoryManagement';
import type { AgentResult, ToolCallRequest } from './types';

/**
Expand All @@ -17,27 +15,17 @@ import type { AgentResult, ToolCallRequest } from './types';
* @param ctx - The execution context
* @param eventStream - The stream of events from the agent
* @param itemIndex - The current item index
* @param returnIntermediateSteps - Whether to capture intermediate steps
* @param memory - Optional memory for saving context
* @param input - The original input prompt
* @returns AgentResult containing output and optional tool calls/steps
*/
export async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
itemIndex: number,
returnIntermediateSteps: boolean = false,
memory?: BaseChatMemory,
input?: string,
): Promise<AgentResult> {
const agentResult: AgentResult = {
output: '',
};

if (returnIntermediateSteps) {
agentResult.intermediateSteps = [];
}

const toolCalls: ToolCallRequest[] = [];

ctx.sendChunk('begin', itemIndex);
Expand Down Expand Up @@ -84,52 +72,6 @@ export async function processEventStream(
messageLog: [output],
});
}

// Also add to intermediate steps if needed
if (returnIntermediateSteps) {
for (const toolCall of output.tool_calls) {
agentResult.intermediateSteps?.push({
action: {
tool: toolCall.name,
toolInput: toolCall.args,
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output], // Include the full LLM response
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
},
observation: '',
});
}
}
}
}
break;
case 'on_tool_end':
// Capture tool execution results and match with action
if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) {
const toolData = event.data as { output?: string };
// Find the matching intermediate step for this tool call
const matchingStep = agentResult.intermediateSteps?.find(
(step) => !step.observation && step.action.tool === event.name,
);
if (matchingStep) {
matchingStep.observation = toolData.output || '';

// Save tool result to memory
if (matchingStep.observation && input) {
await saveToolResultsToMemory(
input,
[
{
action: matchingStep.action,
observation: matchingStep.observation,
},
],
memory,
);
}
}
}
break;
Expand All @@ -139,11 +81,6 @@ export async function processEventStream(
}
ctx.sendChunk('end', itemIndex);

// Save conversation to memory if memory is connected
if (input && agentResult.output) {
await saveToMemory(input, agentResult.output, memory);
}

// Include collected tool calls in the result
if (toolCalls.length > 0) {
agentResult.toolCalls = toolCalls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { HumanMessage, AIMessage, SystemMessage, trimMessages } from '@langchain
import { mock } from 'jest-mock-extended';
import type { BaseChatMemory } from 'langchain/memory';

import { loadMemory, saveToMemory, saveToolResultsToMemory } from '../memoryManagement';
import { loadMemory, saveToMemory, buildToolContext } from '../memoryManagement';
import type { ToolCallData } from '../types';

jest.mock('@langchain/core/messages', () => ({
Expand Down Expand Up @@ -134,10 +134,9 @@ describe('memoryManagement', () => {
});
});

describe('saveToolResultsToMemory', () => {
it('should save tool results to memory', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [
describe('buildToolContext', () => {
it('should build tool context string from single step', () => {
const steps: ToolCallData[] = [
{
action: {
tool: 'calculator',
Expand All @@ -150,19 +149,13 @@ describe('memoryManagement', () => {
},
];

await saveToolResultsToMemory(input, toolResults, mockMemory);
const result = buildToolContext(steps);

expect(mockMemory.saveContext).toHaveBeenCalledWith(
{ input },
{
output: 'Tool: calculator, Input: {"expression":"2+2"}, Result: 4',
},
);
expect(result).toBe('Tool: calculator, Input: {"expression":"2+2"}, Result: 4');
});

it('should save multiple tool results', async () => {
const input = 'Get weather and time';
const toolResults: ToolCallData[] = [
it('should build tool context string from multiple steps', () => {
const steps: ToolCallData[] = [
{
action: {
tool: 'weather',
Expand All @@ -185,57 +178,21 @@ describe('memoryManagement', () => {
},
];

await saveToolResultsToMemory(input, toolResults, mockMemory);
const result = buildToolContext(steps);

expect(mockMemory.saveContext).toHaveBeenCalledTimes(2);
expect(mockMemory.saveContext).toHaveBeenNthCalledWith(
1,
{ input },
{
output: 'Tool: weather, Input: {"location":"New York"}, Result: Sunny, 72°F',
},
);
expect(mockMemory.saveContext).toHaveBeenNthCalledWith(
2,
{ input },
{
output: 'Tool: time, Input: {"timezone":"EST"}, Result: 14:30',
},
expect(result).toBe(
'Tool: weather, Input: {"location":"New York"}, Result: Sunny, 72°F; Tool: time, Input: {"timezone":"EST"}, Result: 14:30',
);
});

it('should not save when memory is not provided', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [
{
action: {
tool: 'calculator',
toolInput: { expression: '2+2' },
log: 'Using calculator',
toolCallId: 'call_123',
type: 'tool_call',
},
observation: '4',
},
];

await saveToolResultsToMemory(input, toolResults, undefined);

expect(mockMemory.saveContext).not.toHaveBeenCalled();
});

it('should not save when toolResults is empty', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [];

await saveToolResultsToMemory(input, toolResults, mockMemory);
it('should return empty string for empty steps array', () => {
const result = buildToolContext([]);

expect(mockMemory.saveContext).not.toHaveBeenCalled();
expect(result).toBe('');
});

it('should handle complex tool inputs', async () => {
const input = 'Search for information';
const toolResults: ToolCallData[] = [
it('should handle complex tool inputs', () => {
const steps: ToolCallData[] = [
{
action: {
tool: 'search',
Expand All @@ -252,14 +209,10 @@ describe('memoryManagement', () => {
},
];

await saveToolResultsToMemory(input, toolResults, mockMemory);
const result = buildToolContext(steps);

expect(mockMemory.saveContext).toHaveBeenCalledWith(
{ input },
{
output:
'Tool: search, Input: {"query":"typescript testing","filters":{"language":"en","date":"2024"},"limit":10}, Result: Found 10 results',
},
expect(result).toBe(
'Tool: search, Input: {"query":"typescript testing","filters":{"language":"en","date":"2024"},"limit":10}, Result: Found 10 results',
);
});
});
Expand Down
Loading