diff --git a/frontend/lib/chat/Readme.md b/frontend/lib/chat/Readme.md index 07448e942..620246de1 100644 --- a/frontend/lib/chat/Readme.md +++ b/frontend/lib/chat/Readme.md @@ -268,7 +268,8 @@ const mcpClient = await createMCPClient({ # Model Configuration -Currently using: `us.anthropic.claude-sonnet-4-20250514-v1:0` via AWS Bedrock +Currently using: `us.anthropic.claude-sonnet-4-20250514-v1:0` via AWS Bedrock for Router and Pipe Agent. +Currently using: `us.anthropic.claude-opus-4-6-v1` via AWS Bedrock for Auditor and TextToSql Agent. ## Data Flow Example diff --git a/frontend/lib/chat/chart/generator.ts b/frontend/lib/chat/chart/generator.ts index d7f35d7b4..734a65abb 100644 --- a/frontend/lib/chat/chart/generator.ts +++ b/frontend/lib/chat/chart/generator.ts @@ -43,7 +43,7 @@ const chartColors = { lines: [lfxColors.positive[500], lfxColors.negative[500], lfxColors.brand[300]], }, }; -const model = bedrock('us.anthropic.claude-sonnet-4-20250514-v1:0'); +const model = bedrock('us.anthropic.claude-opus-4-6-v1'); export async function generateChartConfig( results: Result[], diff --git a/frontend/lib/chat/data-copilot.ts b/frontend/lib/chat/data-copilot.ts index 388e52f26..423b8b9a4 100644 --- a/frontend/lib/chat/data-copilot.ts +++ b/frontend/lib/chat/data-copilot.ts @@ -5,6 +5,7 @@ import { createAmazonBedrock } from '@ai-sdk/amazon-bedrock'; import { experimental_createMCPClient as createMCPClient, type LanguageModelV1 } from 'ai'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import type { Pool } from 'pg'; +import { ofetch } from 'ofetch'; import type { ChatResponse, IChatResponseDb } from '../../server/repo/chat.repo'; import { ChatRepository } from '../../server/repo/chat.repo'; @@ -56,14 +57,23 @@ export class DataCopilot { /** Human-readable overview of tools for router agent decision making */ private toolsOverview: string = ''; + /** Tinybird bucketId for the current project — fetched once and reused across all pipe calls */ + private bucketId: number | null = null; + /** Tinybird MCP server URL */ private tbMcpUrl: string = ''; - /** Amazon Bedrock language model instance */ - private model: LanguageModelV1; + /** Amazon Bedrock language model instance for routing and piping agents (Sonnet) */ + private sonnetModel: LanguageModelV1; + + /** Amazon Bedrock language model instance for text-to-SQL, auditor, and chart agents (Opus) */ + private opusModel: LanguageModelV1; + + /** Bedrock model identifier for general agents */ + private readonly BEDROCK_SONNET_MODEL_ID = 'us.anthropic.claude-sonnet-4-20250514-v1:0'; - /** Bedrock model identifier */ - private readonly BEDROCK_MODEL_ID = 'us.anthropic.claude-sonnet-4-20250514-v1:0'; + /** Bedrock model identifier for text-to-SQL and pipe agent */ + private readonly BEDROCK_OPUS_MODEL_ID = 'us.anthropic.claude-opus-4-6-v1'; /** Maximum number of auditor retry attempts */ private readonly MAX_AUDITOR_RETRIES = 1; @@ -72,7 +82,8 @@ export class DataCopilot { private readonly MAX_SQL_RETRIES = 2; constructor() { - this.model = bedrock(this.BEDROCK_MODEL_ID); + this.sonnetModel = bedrock(this.BEDROCK_SONNET_MODEL_ID); + this.opusModel = bedrock(this.BEDROCK_OPUS_MODEL_ID); this.tbMcpUrl = `https://mcp.tinybird.co?token=${process.env.NUXT_INSIGHTS_DATA_COPILOT_TINYBIRD_TOKEN}&host=${process.env.NUXT_TINYBIRD_BASE_URL}`; } @@ -88,10 +99,42 @@ export class DataCopilot { }), }); - this.tbTools = await this.mcpClient.tools({}); + const allTools = await this.mcpClient.tools({}); + + // Filter out tools with empty descriptions — Bedrock rejects them with a validation error + this.tbTools = Object.fromEntries( + Object.entries(allTools).filter(([_, tool]: [string, any]) => { + const description = + (tool?.description as string) || (tool?.meta?.description as string) || ''; + return description.trim().length > 0; + }), + ); + this.buildToolsOverview(); } + /** + * Fetch and store the Tinybird bucketId for a project. + * Uses ofetch directly to stay outside the Nuxt server context (no useStorage/createError). + */ + private async fetchBucketId(project: string): Promise { + if (!project) return; + const tinybirdBaseUrl = + process.env.NUXT_TINYBIRD_BASE_URL || 'https://api.us-west-2.aws.tinybird.co'; + const tinybirdToken = process.env.NUXT_INSIGHTS_DATA_COPILOT_TINYBIRD_TOKEN; + if (!tinybirdToken) return; + try { + const response = await ofetch( + `${tinybirdBaseUrl}/v0/pipes/project_buckets.json?project=${encodeURIComponent(project)}`, + { headers: { Authorization: `Bearer ${tinybirdToken}` }, timeout: 10000 }, + ); + this.bucketId = response.data?.[0]?.bucketId ?? null; + } catch (error) { + console.error(`[DataCopilot] Failed to fetch bucketId for "${project}":`, error); + this.bucketId = null; + } + } + /** * Build human-readable overview of available tools for the router agent */ @@ -170,10 +213,20 @@ export class DataCopilot { instructions?: string, ): Promise { const chatRepo = new ChatRepository(insightsDbPool); + + // For now, we use the Opus model for TextToSql and Auditor Agents. + // The model is currently too slow for both the Pipe and Router agents. + let model: string | undefined = this.BEDROCK_SONNET_MODEL_ID; + + if (agent === 'EXECUTE_INSTRUCTIONS') { + model = undefined; + } else if (agent === 'TEXT_TO_SQL' || agent === 'AUDITOR') { + model = this.BEDROCK_OPUS_MODEL_ID; + } await chatRepo.saveAgentStep({ chatResponseId, agent, - model: agent === 'EXECUTE_INSTRUCTIONS' ? undefined : this.BEDROCK_MODEL_ID, + model, response, inputTokens: response?.usage?.promptTokens || 0, outputTokens: response?.usage?.completionTokens || 0, @@ -207,7 +260,7 @@ export class DataCopilot { }: Omit) { const agent = new RouterAgent(); return agent.execute({ - model: this.model, + model: this.sonnetModel, messages, tools: this.tbTools, toolsOverview: this.toolsOverview, @@ -248,7 +301,7 @@ export class DataCopilot { const agent = new TextToSqlAgent(); return agent.execute({ - model: this.model, + model: this.opusModel, messages, tools: followUpTools, date, @@ -288,12 +341,21 @@ export class DataCopilot { const followUpTools: Record = {}; for (const toolName of toolNames) { if (this.tbTools[toolName]) { - followUpTools[toolName] = this.tbTools[toolName]; + const tool = this.tbTools[toolName] as any; + // Wrap execute to inject bucketId into every MCP tool call during planning + followUpTools[toolName] = + !!this.bucketId && this.tbTools[toolName]?.execute + ? { + ...this.tbTools[toolName], + execute: async (params: any) => + tool.execute({ bucketId: this.bucketId, ...params }), + } + : tool; } } const agent = new PipeAgent(); return agent.execute({ - model: this.model, + model: this.sonnetModel, messages, tools: followUpTools, date, @@ -329,7 +391,7 @@ export class DataCopilot { const dataSummary = generateDataSummary(data); const agent = new AuditorAgent(); return agent.execute({ - model: this.model, + model: this.opusModel, messages, originalQuestion, reformulatedQuestion, @@ -606,7 +668,7 @@ export class DataCopilot { } // Prepare for retry - add feedback to messages and loop - previousFeedback = auditorResult.feedback_to_router; + previousFeedback = auditorResult.feedback_to_router || undefined; attemptNumber++; dataStream.writeData({ @@ -742,6 +804,12 @@ export class DataCopilot { const parametersString = JSON.stringify(parameters || {}); const date = new Date().toISOString().slice(0, 10); + // Fetch bucketId once upfront — required by all Tinybird pipes for data partitioning + const project = (parameters as any)?.project; + if (project) { + await this.fetchBucketId(project); + } + // Build messages from conversation history const { messages, previousWasClarification } = await this.buildMessagesFromConversation( currentQuestion, @@ -768,7 +836,7 @@ export class DataCopilot { userPrompt: currentQuestion, inputTokens: 0, outputTokens: 0, - model: this.BEDROCK_MODEL_ID, + model: this.BEDROCK_SONNET_MODEL_ID, conversationId: conversationId || '', routerResponse: RouterDecisionAction.STOP, routerReason: '', @@ -866,7 +934,7 @@ export class DataCopilot { routerReason: routerOutput.reasoning, pipeInstructions: undefined, sqlQuery: undefined, - model: this.BEDROCK_MODEL_ID, + model: this.BEDROCK_SONNET_MODEL_ID, conversationId: conversationId, }, insightsDbPool, @@ -901,7 +969,7 @@ export class DataCopilot { clarificationQuestion: routerOutput.clarification_question || undefined, pipeInstructions: undefined, sqlQuery: undefined, - model: this.BEDROCK_MODEL_ID, + model: this.BEDROCK_SONNET_MODEL_ID, conversationId: conversationId, }, insightsDbPool, @@ -1186,7 +1254,7 @@ export class DataCopilot { // Execute the pipes according to the instructions and combine results (don't stream data yet, auditor will do it) const pipeExecutionStart = Date.now(); try { - const combinedData = await executePipeInstructions(pipeOutput.instructions); + const combinedData = await executePipeInstructions(pipeOutput.instructions, this.bucketId); const pipeExecutionTime = (Date.now() - pipeExecutionStart) / 1000; // Track successful pipe execution step @@ -1246,7 +1314,7 @@ export class DataCopilot { routerReason: routerOutput.reasoning, pipeInstructions, sqlQuery, - model: this.BEDROCK_MODEL_ID, + model: this.BEDROCK_SONNET_MODEL_ID, conversationId: conversationId, }, insightsDbPool, diff --git a/frontend/lib/chat/instructions.ts b/frontend/lib/chat/instructions.ts index 6a292ae83..b65425056 100644 --- a/frontend/lib/chat/instructions.ts +++ b/frontend/lib/chat/instructions.ts @@ -44,14 +44,18 @@ async function executeTinybirdPipe( } // Function to execute pipe instructions and combine results -export async function executePipeInstructions(instructions: PipeInstructions): Promise { +export async function executePipeInstructions( + instructions: PipeInstructions, + bucketId?: number | null, +): Promise { // Execute the pipes according to the instructions const pipeResults: Record = {}; // Execute each pipe with its inputs using TinyBird API for (const pipeInstruction of instructions.pipes) { try { - const result = await executeTinybirdPipe(pipeInstruction.name, pipeInstruction.inputs); + const inputs = !!bucketId ? { bucketId, ...pipeInstruction.inputs } : pipeInstruction.inputs; + const result = await executeTinybirdPipe(pipeInstruction.name, inputs); pipeResults[pipeInstruction.id] = result; } catch (error) { console.error(`Error executing pipe ${pipeInstruction.name}:`, error); diff --git a/frontend/lib/chat/prompts/auditor.ts b/frontend/lib/chat/prompts/auditor.ts index de1b0cd29..a49979735 100644 --- a/frontend/lib/chat/prompts/auditor.ts +++ b/frontend/lib/chat/prompts/auditor.ts @@ -36,6 +36,8 @@ export const auditorPrompt = ( }) .join('\n'); + const dataSection = `\n## TOP ROWS (first ${dataSummary.topRows.length} of ${dataSummary.rowCount} — use these actual values in your summary)\n\`\`\`json\n${JSON.stringify(dataSummary.topRows, null, 2)}\n\`\`\``; + return `You are an Auditor agent that validates whether retrieved data can answer the user's question. ## USER'S QUESTION @@ -47,6 +49,7 @@ ${reformulatedQuestion} ## DATA SUMMARY **Total Rows:** ${dataSummary.rowCount} **Columns:** ${dataSummary.columns.join(', ')} +**Top Rows:** ${dataSection} **Column Statistics:** ${statsFormatted} @@ -121,12 +124,12 @@ Make a **BINARY decision**: Can this data answer the user's question? **IF is_valid = true:** - Set \`is_valid: true\` -- Write a brief \`summary\` (2-3 sentences) for the user: - - What the data shows - - Key findings based on statistics - - Direct answer to their question - - Example: "Commit activity in 2024 ranged from 0 to 453 per day across 12 companies, - with an average of 87 commits daily." +- Write a conversational \`summary\` (1-3 sentences) for the user: + - If possible, write a summary that directly answers the user's question. + - You SHOULD reference actual values from the TOP ROWS data above — name the specific country, person, organization, etc. NEVER guess, infer, or use external knowledge. + - Unknown / null / placeholder entries: If a top row has a null, empty, "Unknown", or placeholder value (e.g. country code "XX", name "null") in a label column, do NOT treat it as a real result. Explain it represents unattributed or anonymous data (e.g. "contributions where the country of origin is unknown"), then identify and state the top row with a real value as the actual answer. + - ✅ Example: "The top contributor country is the United States with 670 contributors. Note: 5,882 contributions have no country attribution and are listed separately as 'Unknown'." + - Write summary in plain text, not markdown. **IF is_valid = false:** - Set \`is_valid: false\` diff --git a/frontend/lib/chat/types.ts b/frontend/lib/chat/types.ts index 80929c1d5..4c62cf799 100644 --- a/frontend/lib/chat/types.ts +++ b/frontend/lib/chat/types.ts @@ -120,9 +120,10 @@ export const auditorOutputSchema = z.object({ reasoning: z.string().describe('2-3 sentences explaining the validation decision'), feedback_to_router: z .string() + .nullable() .optional() .describe('If invalid, specific guidance for router to fix the issue'), - summary: z.string().optional().describe('If valid, user-friendly summary of findings'), + summary: z.string().nullable().optional().describe('If valid, user-friendly summary of findings'), }); // TypeScript types for agent outputs diff --git a/frontend/lib/chat/utils/data-summary.ts b/frontend/lib/chat/utils/data-summary.ts index c22f4f810..cf1d548c8 100644 --- a/frontend/lib/chat/utils/data-summary.ts +++ b/frontend/lib/chat/utils/data-summary.ts @@ -5,6 +5,7 @@ export interface DataSummary { rowCount: number; columns: string[]; columnStats: Record; + topRows: Record[]; } export interface ColumnStats { @@ -32,8 +33,8 @@ export interface ColumnStats { /** * Generate statistical summary of dataset - * Token-efficient: ~400-500 tokens for typical dataset - * No raw data samples sent to LLM - only statistics + * Token-efficient: ~1500-2000 tokens for typical dataset + * Top rows of raw data sent to LLM + statistics * * @param data - Array of data rows * @returns Statistical summary optimized for auditor validation @@ -44,6 +45,7 @@ export function generateDataSummary>(data: T[] rowCount: 0, columns: [], columnStats: {}, + topRows: [], }; } @@ -127,9 +129,11 @@ export function generateDataSummary>(data: T[] columnStats[col] = stats; } + const rows = data as Record[]; return { rowCount: data.length, columns, columnStats, + topRows: rows.slice(0, 3), }; } diff --git a/frontend/server/api/chat/chart.ts b/frontend/server/api/chat/chart.ts index 198aac118..64c9759fc 100644 --- a/frontend/server/api/chat/chart.ts +++ b/frontend/server/api/chat/chart.ts @@ -4,6 +4,8 @@ import { Pool } from 'pg'; import { generateChartConfig, modifyChartConfig } from '../../../lib/chat/chart/generator'; import { ChatRepository } from '../../repo/chat.repo'; import { Result, Config, DataMapping } from '../../../lib/chat/chart/types'; +import { getBucketIdForProject } from '../../data/tinybird/bucket-cache'; +import { fetchFromTinybird } from '../../data/tinybird/tinybird'; import { PipeInstructions } from '~~/lib/chat/types'; export const maxDuration = 30; @@ -59,7 +61,9 @@ export default defineEventHandler(async (event): Promise