Skip to content
Merged
3 changes: 2 additions & 1 deletion frontend/lib/chat/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ const mcpClient = await createMCPClient({

# Model Configuration

Currently using: `us.anthropic.claude-sonnet-4-20250514-v1:0` via AWS Bedrock
Currently using: `us.anthropic.claude-sonnet-4-20250514-v1:0` via AWS Bedrock for Router and Pipe Agent.
Currently using: `us.anthropic.claude-opus-4-6-v1` via AWS Bedrock for Auditor and TextToSql Agent.

## Data Flow Example

Expand Down
2 changes: 1 addition & 1 deletion frontend/lib/chat/chart/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const chartColors = {
lines: [lfxColors.positive[500], lfxColors.negative[500], lfxColors.brand[300]],
},
};
const model = bedrock('us.anthropic.claude-sonnet-4-20250514-v1:0');
const model = bedrock('us.anthropic.claude-opus-4-6-v1');

export async function generateChartConfig(
results: Result[],
Expand Down
104 changes: 86 additions & 18 deletions frontend/lib/chat/data-copilot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createAmazonBedrock } from '@ai-sdk/amazon-bedrock';
import { experimental_createMCPClient as createMCPClient, type LanguageModelV1 } from 'ai';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import type { Pool } from 'pg';
import { ofetch } from 'ofetch';
import type { ChatResponse, IChatResponseDb } from '../../server/repo/chat.repo';
import { ChatRepository } from '../../server/repo/chat.repo';

Expand Down Expand Up @@ -56,14 +57,23 @@ export class DataCopilot {
/** Human-readable overview of tools for router agent decision making */
private toolsOverview: string = '';

/** Tinybird bucketId for the current project — fetched once and reused across all pipe calls */
private bucketId: number | null = null;

/** Tinybird MCP server URL */
private tbMcpUrl: string = '';

/** Amazon Bedrock language model instance */
private model: LanguageModelV1;
/** Amazon Bedrock language model instance for routing and piping agents (Sonnet) */
private sonnetModel: LanguageModelV1;

/** Amazon Bedrock language model instance for text-to-SQL, auditor, and chart agents (Opus) */
private opusModel: LanguageModelV1;

/** Bedrock model identifier for general agents */
private readonly BEDROCK_SONNET_MODEL_ID = 'us.anthropic.claude-sonnet-4-20250514-v1:0';

/** Bedrock model identifier */
private readonly BEDROCK_MODEL_ID = 'us.anthropic.claude-sonnet-4-20250514-v1:0';
/** Bedrock model identifier for text-to-SQL and pipe agent */
private readonly BEDROCK_OPUS_MODEL_ID = 'us.anthropic.claude-opus-4-6-v1';

/** Maximum number of auditor retry attempts */
private readonly MAX_AUDITOR_RETRIES = 1;
Expand All @@ -72,7 +82,8 @@ export class DataCopilot {
private readonly MAX_SQL_RETRIES = 2;

constructor() {
this.model = bedrock(this.BEDROCK_MODEL_ID);
this.sonnetModel = bedrock(this.BEDROCK_SONNET_MODEL_ID);
this.opusModel = bedrock(this.BEDROCK_OPUS_MODEL_ID);
this.tbMcpUrl = `https://mcp.tinybird.co?token=${process.env.NUXT_INSIGHTS_DATA_COPILOT_TINYBIRD_TOKEN}&host=${process.env.NUXT_TINYBIRD_BASE_URL}`;
}

Expand All @@ -88,10 +99,42 @@ export class DataCopilot {
}),
});

this.tbTools = await this.mcpClient.tools({});
const allTools = await this.mcpClient.tools({});

// Filter out tools with empty descriptions — Bedrock rejects them with a validation error
this.tbTools = Object.fromEntries(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything important is missing descriptions? Should we report these to Tinybird support?

Object.entries(allTools).filter(([_, tool]: [string, any]) => {
const description =
(tool?.description as string) || (tool?.meta?.description as string) || '';
return description.trim().length > 0;
}),
);

this.buildToolsOverview();
}

/**
* Fetch and store the Tinybird bucketId for a project.
* Uses ofetch directly to stay outside the Nuxt server context (no useStorage/createError).
*/
private async fetchBucketId(project: string): Promise<void> {
if (!project) return;
const tinybirdBaseUrl =
process.env.NUXT_TINYBIRD_BASE_URL || 'https://api.us-west-2.aws.tinybird.co';
const tinybirdToken = process.env.NUXT_INSIGHTS_DATA_COPILOT_TINYBIRD_TOKEN;
if (!tinybirdToken) return;
try {
const response = await ofetch(
`${tinybirdBaseUrl}/v0/pipes/project_buckets.json?project=${encodeURIComponent(project)}`,
{ headers: { Authorization: `Bearer ${tinybirdToken}` }, timeout: 10000 },
);
this.bucketId = response.data?.[0]?.bucketId ?? null;
} catch (error) {
console.error(`[DataCopilot] Failed to fetch bucketId for "${project}":`, error);
this.bucketId = null;
}
}

/**
* Build human-readable overview of available tools for the router agent
*/
Expand Down Expand Up @@ -170,10 +213,20 @@ export class DataCopilot {
instructions?: string,
): Promise<void> {
const chatRepo = new ChatRepository(insightsDbPool);

// For now, we use the Opus model for TextToSql and Auditor Agents.
// The model is currently too slow for both the Pipe and Router agents.
let model: string | undefined = this.BEDROCK_SONNET_MODEL_ID;

if (agent === 'EXECUTE_INSTRUCTIONS') {
model = undefined;
} else if (agent === 'TEXT_TO_SQL' || agent === 'AUDITOR') {
model = this.BEDROCK_OPUS_MODEL_ID;
}
await chatRepo.saveAgentStep({
chatResponseId,
agent,
model: agent === 'EXECUTE_INSTRUCTIONS' ? undefined : this.BEDROCK_MODEL_ID,
model,
response,
inputTokens: response?.usage?.promptTokens || 0,
outputTokens: response?.usage?.completionTokens || 0,
Expand Down Expand Up @@ -207,7 +260,7 @@ export class DataCopilot {
}: Omit<RouterAgentInput, 'toolsOverview' | 'model' | 'tools'>) {
const agent = new RouterAgent();
return agent.execute({
model: this.model,
model: this.sonnetModel,
messages,
tools: this.tbTools,
toolsOverview: this.toolsOverview,
Expand Down Expand Up @@ -248,7 +301,7 @@ export class DataCopilot {

const agent = new TextToSqlAgent();
return agent.execute({
model: this.model,
model: this.opusModel,
messages,
tools: followUpTools,
date,
Expand Down Expand Up @@ -288,12 +341,21 @@ export class DataCopilot {
const followUpTools: Record<string, unknown> = {};
for (const toolName of toolNames) {
if (this.tbTools[toolName]) {
followUpTools[toolName] = this.tbTools[toolName];
const tool = this.tbTools[toolName] as any;
// Wrap execute to inject bucketId into every MCP tool call during planning
followUpTools[toolName] =
!!this.bucketId && this.tbTools[toolName]?.execute
? {
...this.tbTools[toolName],
execute: async (params: any) =>
tool.execute({ bucketId: this.bucketId, ...params }),
}
: tool;
}
}
const agent = new PipeAgent();
return agent.execute({
model: this.model,
model: this.sonnetModel,
messages,
tools: followUpTools,
date,
Expand Down Expand Up @@ -329,7 +391,7 @@ export class DataCopilot {
const dataSummary = generateDataSummary(data);
const agent = new AuditorAgent();
return agent.execute({
model: this.model,
model: this.opusModel,
messages,
originalQuestion,
reformulatedQuestion,
Expand Down Expand Up @@ -606,7 +668,7 @@ export class DataCopilot {
}

// Prepare for retry - add feedback to messages and loop
previousFeedback = auditorResult.feedback_to_router;
previousFeedback = auditorResult.feedback_to_router || undefined;
attemptNumber++;

dataStream.writeData({
Expand Down Expand Up @@ -742,6 +804,12 @@ export class DataCopilot {
const parametersString = JSON.stringify(parameters || {});
const date = new Date().toISOString().slice(0, 10);

// Fetch bucketId once upfront — required by all Tinybird pipes for data partitioning
const project = (parameters as any)?.project;
if (project) {
await this.fetchBucketId(project);
}

// Build messages from conversation history
const { messages, previousWasClarification } = await this.buildMessagesFromConversation(
currentQuestion,
Expand All @@ -768,7 +836,7 @@ export class DataCopilot {
userPrompt: currentQuestion,
inputTokens: 0,
outputTokens: 0,
model: this.BEDROCK_MODEL_ID,
model: this.BEDROCK_SONNET_MODEL_ID,
conversationId: conversationId || '',
routerResponse: RouterDecisionAction.STOP,
routerReason: '',
Expand Down Expand Up @@ -866,7 +934,7 @@ export class DataCopilot {
routerReason: routerOutput.reasoning,
pipeInstructions: undefined,
sqlQuery: undefined,
model: this.BEDROCK_MODEL_ID,
model: this.BEDROCK_SONNET_MODEL_ID,
conversationId: conversationId,
},
insightsDbPool,
Expand Down Expand Up @@ -901,7 +969,7 @@ export class DataCopilot {
clarificationQuestion: routerOutput.clarification_question || undefined,
pipeInstructions: undefined,
sqlQuery: undefined,
model: this.BEDROCK_MODEL_ID,
model: this.BEDROCK_SONNET_MODEL_ID,
conversationId: conversationId,
},
insightsDbPool,
Expand Down Expand Up @@ -1186,7 +1254,7 @@ export class DataCopilot {
// Execute the pipes according to the instructions and combine results (don't stream data yet, auditor will do it)
const pipeExecutionStart = Date.now();
try {
const combinedData = await executePipeInstructions(pipeOutput.instructions);
const combinedData = await executePipeInstructions(pipeOutput.instructions, this.bucketId);
const pipeExecutionTime = (Date.now() - pipeExecutionStart) / 1000;

// Track successful pipe execution step
Expand Down Expand Up @@ -1246,7 +1314,7 @@ export class DataCopilot {
routerReason: routerOutput.reasoning,
pipeInstructions,
sqlQuery,
model: this.BEDROCK_MODEL_ID,
model: this.BEDROCK_SONNET_MODEL_ID,
conversationId: conversationId,
},
insightsDbPool,
Expand Down
8 changes: 6 additions & 2 deletions frontend/lib/chat/instructions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ async function executeTinybirdPipe(
}

// Function to execute pipe instructions and combine results
export async function executePipeInstructions(instructions: PipeInstructions): Promise<any[]> {
export async function executePipeInstructions(
instructions: PipeInstructions,
bucketId?: number | null,
): Promise<any[]> {
// Execute the pipes according to the instructions
const pipeResults: Record<string, any[]> = {};

// Execute each pipe with its inputs using TinyBird API
for (const pipeInstruction of instructions.pipes) {
try {
const result = await executeTinybirdPipe(pipeInstruction.name, pipeInstruction.inputs);
const inputs = !!bucketId ? { bucketId, ...pipeInstruction.inputs } : pipeInstruction.inputs;
const result = await executeTinybirdPipe(pipeInstruction.name, inputs);
pipeResults[pipeInstruction.id] = result;
} catch (error) {
console.error(`Error executing pipe ${pipeInstruction.name}:`, error);
Expand Down
15 changes: 9 additions & 6 deletions frontend/lib/chat/prompts/auditor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export const auditorPrompt = (
})
.join('\n');

const dataSection = `\n## TOP ROWS (first ${dataSummary.topRows.length} of ${dataSummary.rowCount} — use these actual values in your summary)\n\`\`\`json\n${JSON.stringify(dataSummary.topRows, null, 2)}\n\`\`\``;

return `You are an Auditor agent that validates whether retrieved data can answer the user's question.

## USER'S QUESTION
Expand All @@ -47,6 +49,7 @@ ${reformulatedQuestion}
## DATA SUMMARY
**Total Rows:** ${dataSummary.rowCount}
**Columns:** ${dataSummary.columns.join(', ')}
**Top Rows:** ${dataSection}

**Column Statistics:**
${statsFormatted}
Expand Down Expand Up @@ -121,12 +124,12 @@ Make a **BINARY decision**: Can this data answer the user's question?

**IF is_valid = true:**
- Set \`is_valid: true\`
- Write a brief \`summary\` (2-3 sentences) for the user:
- What the data shows
- Key findings based on statistics
- Direct answer to their question
- Example: "Commit activity in 2024 ranged from 0 to 453 per day across 12 companies,
with an average of 87 commits daily."
- Write a conversational \`summary\` (1-3 sentences) for the user:
- If possible, write a summary that directly answers the user's question.
- You SHOULD reference actual values from the TOP ROWS data above — name the specific country, person, organization, etc. NEVER guess, infer, or use external knowledge.
- Unknown / null / placeholder entries: If a top row has a null, empty, "Unknown", or placeholder value (e.g. country code "XX", name "null") in a label column, do NOT treat it as a real result. Explain it represents unattributed or anonymous data (e.g. "contributions where the country of origin is unknown"), then identify and state the top row with a real value as the actual answer.
- ✅ Example: "The top contributor country is the United States with 670 contributors. Note: 5,882 contributions have no country attribution and are listed separately as 'Unknown'."
- Write summary in plain text, not markdown.

**IF is_valid = false:**
- Set \`is_valid: false\`
Expand Down
3 changes: 2 additions & 1 deletion frontend/lib/chat/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ export const auditorOutputSchema = z.object({
reasoning: z.string().describe('2-3 sentences explaining the validation decision'),
feedback_to_router: z
.string()
.nullable()
.optional()
.describe('If invalid, specific guidance for router to fix the issue'),
summary: z.string().optional().describe('If valid, user-friendly summary of findings'),
summary: z.string().nullable().optional().describe('If valid, user-friendly summary of findings'),
});

// TypeScript types for agent outputs
Expand Down
8 changes: 6 additions & 2 deletions frontend/lib/chat/utils/data-summary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface DataSummary {
rowCount: number;
columns: string[];
columnStats: Record<string, ColumnStats>;
topRows: Record<string, unknown>[];
}

export interface ColumnStats {
Expand Down Expand Up @@ -32,8 +33,8 @@ export interface ColumnStats {

/**
* Generate statistical summary of dataset
* Token-efficient: ~400-500 tokens for typical dataset
* No raw data samples sent to LLM - only statistics
* Token-efficient: ~1500-2000 tokens for typical dataset
* Top rows of raw data sent to LLM + statistics
*
* @param data - Array of data rows
* @returns Statistical summary optimized for auditor validation
Expand All @@ -44,6 +45,7 @@ export function generateDataSummary<T extends Record<string, unknown>>(data: T[]
rowCount: 0,
columns: [],
columnStats: {},
topRows: [],
};
}

Expand Down Expand Up @@ -127,9 +129,11 @@ export function generateDataSummary<T extends Record<string, unknown>>(data: T[]
columnStats[col] = stats;
}

const rows = data as Record<string, unknown>[];
return {
rowCount: data.length,
columns,
columnStats,
topRows: rows.slice(0, 3),
};
}
6 changes: 5 additions & 1 deletion frontend/server/api/chat/chart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Pool } from 'pg';
import { generateChartConfig, modifyChartConfig } from '../../../lib/chat/chart/generator';
import { ChatRepository } from '../../repo/chat.repo';
import { Result, Config, DataMapping } from '../../../lib/chat/chart/types';
import { getBucketIdForProject } from '../../data/tinybird/bucket-cache';
import { fetchFromTinybird } from '../../data/tinybird/tinybird';
import { PipeInstructions } from '~~/lib/chat/types';

export const maxDuration = 30;
Expand Down Expand Up @@ -59,7 +61,9 @@ export default defineEventHandler(async (event): Promise<ChartConfigResponse | E
const { executePipeInstructions } = await import('../../../lib/chat/instructions');

try {
const executedResults = await executePipeInstructions(pipeInstructions);
const project = pipeInstructions.pipes[0]?.inputs?.project as string | undefined;
const bucketId = project ? await getBucketIdForProject(project, fetchFromTinybird) : null;
const executedResults = await executePipeInstructions(pipeInstructions, bucketId);

if (!userQuery) {
return createError({
Expand Down