From 68050991a789007eb71c0882e6eab38204713c09 Mon Sep 17 00:00:00 2001 From: kevinkim-ogp Date: Wed, 5 Nov 2025 15:20:00 +0800 Subject: [PATCH 1/5] feat: add api for chat streaming --- .../src/routes/api/__tests__/chat.test.ts | 302 ++++++++++++++++++ packages/backend/src/routes/api/chat.ts | 182 +++++++++++ packages/backend/src/routes/api/index.ts | 3 + 3 files changed, 487 insertions(+) create mode 100644 packages/backend/src/routes/api/__tests__/chat.test.ts create mode 100644 packages/backend/src/routes/api/chat.ts diff --git a/packages/backend/src/routes/api/__tests__/chat.test.ts b/packages/backend/src/routes/api/__tests__/chat.test.ts new file mode 100644 index 000000000..68f3a5631 --- /dev/null +++ b/packages/backend/src/routes/api/__tests__/chat.test.ts @@ -0,0 +1,302 @@ +import type { Request, Response } from 'express' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import type Context from '@/types/express/context' + +const mocks = vi.hoisted(() => ({ + getAuthenticatedContext: vi.fn(), + getLdFlagValue: vi.fn(), + langfuseClient: { + prompt: { + get: vi.fn(), + }, + }, + startActiveObservation: vi.fn(), + streamText: vi.fn(), +})) + +vi.mock('../middleware/authentication', () => ({ + getAuthenticatedContext: mocks.getAuthenticatedContext, +})) + +vi.mock('@/helpers/launch-darkly', () => ({ + getLdFlagValue: mocks.getLdFlagValue, +})) + +vi.mock('@/helpers/langfuse', () => ({ + langfuseClient: mocks.langfuseClient, +})) + +vi.mock('@langfuse/tracing', () => ({ + startActiveObservation: mocks.startActiveObservation, +})) + +vi.mock('ai', () => ({ + convertToModelMessages: vi.fn((msgs) => msgs), + smoothStream: vi.fn(() => ({})), + streamText: mocks.streamText, +})) + +vi.mock('@/helpers/logger', () => ({ + default: { + info: vi.fn(), + error: vi.fn(), + }, +})) + +vi.mock('@/helpers/pair', () => ({ + model: {}, + MODEL_TYPE: 'test-model', +})) + +vi.mock('@/config/app', () => ({ + default: { + appEnv: 'test', + }, +})) + +// Helper function to get and execute the POST handler from the chat router +async function executeChatPostHandler( + req: Partial, + res: Partial, +) { + const chatModule = await import('../chat') + const router = chatModule.default + + // Extract the POST handler + const postHandler = (router as any).stack.find( + (layer: any) => layer.route?.methods?.post, + )?.route?.stack[0]?.handle + + if (!postHandler) { + throw new Error('POST handler not found in chat router') + } + + return postHandler(req, res) +} + +describe('Chat Route Authentication', () => { + let mockReq: Partial + let mockRes: Partial + + beforeEach(() => { + mockReq = { + body: { + messages: [ + { + role: 'user', + parts: [{ type: 'text', text: 'Hello' }], + }, + ], + }, + context: { + currentUser: { + id: 'test-user-id', + email: 'test@plumber.gov.sg', + } as any, + isAdminOperation: false, + } as any, + } as Partial + + mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + headersSent: false, + end: vi.fn(), + } as Partial + + // Reset mocks + vi.clearAllMocks() + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + describe('Authentication Requirements', () => { + it('should call getAuthenticatedContext on every request', async () => { + const mockContext: Context = { + req: mockReq as Request, + res: mockRes as Response, + currentUser: { + id: 'test-user-id', + email: 'test@plumber.gov.sg', + } as any, + isAdminOperation: false, + } + + mocks.getAuthenticatedContext.mockReturnValueOnce(mockContext) + mocks.getLdFlagValue.mockResolvedValueOnce({ + chatPrompt: 'aids-chat-v0', + version: 'production', + }) + mocks.langfuseClient.prompt.get.mockResolvedValueOnce({ + prompt: 'test prompt', + }) + + // Mock streamText to return a mock result + const mockResult = { + pipeUIMessageStreamToResponse: vi.fn(), + } + mocks.startActiveObservation.mockImplementationOnce( + async (name, callback) => { + const mockTrace = { + updateTrace: vi.fn(), + startObservation: vi.fn(() => ({ + update: vi.fn(), + })), + update: vi.fn(), + traceId: 'test-trace-id', + } + return await callback(mockTrace) + }, + ) + mocks.streamText.mockResolvedValueOnce(mockResult) + + await executeChatPostHandler(mockReq, mockRes) + + expect(mocks.getAuthenticatedContext).toHaveBeenCalledWith(mockReq) + }) + + it('should throw error when user is not authenticated', async () => { + mocks.getAuthenticatedContext.mockImplementationOnce(() => { + throw new Error('User must be authenticated') + }) + + await expect(executeChatPostHandler(mockReq, mockRes)).rejects.toThrow( + 'User must be authenticated', + ) + + expect(mocks.getAuthenticatedContext).toHaveBeenCalledWith(mockReq) + // Should not reach getLdFlagValue since authentication failed + expect(mocks.getLdFlagValue).not.toHaveBeenCalled() + }) + + it('should use authenticated user email for feature flag lookup', async () => { + const mockContext: Context = { + req: mockReq as Request, + res: mockRes as Response, + currentUser: { + id: 'test-user-id', + email: 'feature-test@plumber.gov.sg', + } as any, + isAdminOperation: false, + } + + mocks.getAuthenticatedContext.mockReturnValueOnce(mockContext) + mocks.getLdFlagValue.mockResolvedValueOnce({ + chatPrompt: 'aids-chat-v0', + version: 'production', + }) + mocks.langfuseClient.prompt.get.mockResolvedValueOnce({ + prompt: 'test prompt', + }) + + const mockResult = { + pipeUIMessageStreamToResponse: vi.fn(), + } + mocks.startActiveObservation.mockImplementationOnce( + async (name, callback) => { + const mockTrace = { + updateTrace: vi.fn(), + startObservation: vi.fn(() => ({ + update: vi.fn(), + })), + update: vi.fn(), + traceId: 'test-trace-id', + } + return await callback(mockTrace) + }, + ) + mocks.streamText.mockResolvedValueOnce(mockResult) + + await executeChatPostHandler(mockReq, mockRes) + + expect(mocks.getLdFlagValue).toHaveBeenCalledWith( + 'ai-builder-prompt-config', + 'feature-test@plumber.gov.sg', + expect.any(Object), + ) + }) + + it('should validate request body before processing', async () => { + const mockContext: Context = { + req: mockReq as Request, + res: mockRes as Response, + currentUser: { + id: 'test-user-id', + email: 'test@plumber.gov.sg', + } as any, + isAdminOperation: false, + } + + // Invalid request body (empty messages) + mockReq.body = { + messages: [], + } + + mocks.getAuthenticatedContext.mockReturnValueOnce(mockContext) + mocks.getLdFlagValue.mockResolvedValueOnce({ + chatPrompt: 'aids-chat-v0', + version: 'production', + }) + + await executeChatPostHandler(mockReq, mockRes) + + expect(mockRes.status).toHaveBeenCalledWith(400) + expect(mockRes.json).toHaveBeenCalledWith({ + error: 'Messages array is required', + }) + }) + }) + + describe('Admin User Access', () => { + it('should allow admin users to access the endpoint', async () => { + const mockContext: Context = { + req: mockReq as Request, + res: mockRes as Response, + currentUser: { + id: 'admin-user-id', + email: 'admin@plumber.gov.sg', + } as any, + isAdminOperation: true, + } + + mocks.getAuthenticatedContext.mockReturnValueOnce(mockContext) + mocks.getLdFlagValue.mockResolvedValueOnce({ + chatPrompt: 'aids-chat-v0', + version: 'production', + }) + mocks.langfuseClient.prompt.get.mockResolvedValueOnce({ + prompt: 'test prompt', + }) + + const mockResult = { + pipeUIMessageStreamToResponse: vi.fn(), + } + mocks.startActiveObservation.mockImplementationOnce( + async (name, callback) => { + const mockTrace = { + updateTrace: vi.fn(), + startObservation: vi.fn(() => ({ + update: vi.fn(), + })), + update: vi.fn(), + traceId: 'test-trace-id', + } + return await callback(mockTrace) + }, + ) + mocks.streamText.mockResolvedValueOnce(mockResult) + + await executeChatPostHandler(mockReq, mockRes) + + expect(mocks.getAuthenticatedContext).toHaveBeenCalledWith(mockReq) + expect(mocks.getLdFlagValue).toHaveBeenCalledWith( + 'ai-builder-prompt-config', + 'admin@plumber.gov.sg', + expect.any(Object), + ) + }) + }) +}) diff --git a/packages/backend/src/routes/api/chat.ts b/packages/backend/src/routes/api/chat.ts new file mode 100644 index 000000000..17db27edd --- /dev/null +++ b/packages/backend/src/routes/api/chat.ts @@ -0,0 +1,182 @@ +import { startActiveObservation } from '@langfuse/tracing' +import { convertToModelMessages, smoothStream, streamText } from 'ai' +import type { Request, Response } from 'express' +import { Router } from 'express' + +import appConfig from '@/config/app' +import { langfuseClient } from '@/helpers/langfuse' +import { getLdFlagValue } from '@/helpers/launch-darkly' +import logger from '@/helpers/logger' +import { model, MODEL_TYPE } from '@/helpers/pair' + +import { getAuthenticatedContext } from './middleware/authentication' + +interface ChatRequest { + messages: Array<{ + role: 'user' | 'assistant' | 'system' + parts: Array<{ type: 'text'; text: string }> + }> + userId?: string + sessionId?: string +} + +async function handleChatStream(req: Request, res: Response) { + const context = getAuthenticatedContext(req) + + const promptConfig = await getLdFlagValue( + 'ai-builder-prompt-config', + context.currentUser.email, + { + chatPrompt: 'aids-chat-v0', + version: 'production', + }, + ) + + const { chatPrompt, version } = promptConfig + + try { + const { messages: rawMessages, userId, sessionId } = req.body as ChatRequest + + if ( + !rawMessages || + !Array.isArray(rawMessages) || + rawMessages.length === 0 + ) { + res.status(400).json({ error: 'Messages array is required' }) + return + } + + // Convert UIMessages to ModelMessages + const messages = convertToModelMessages(rawMessages as any) + + // Extract last user message text for tracking (simple text-only format) + const lastUserMessage = rawMessages + .filter((m) => m.role === 'user') + .map((m) => m.parts.find((p) => p.type === 'text')?.text || '') + .pop() + + // Get the prompt from Langfuse + const prompt = await langfuseClient.prompt.get(chatPrompt, { + label: version, + }) + + let traceId = '' + let generationId = '' + + // Setup observation and stream using AI SDK + const result = await startActiveObservation( + 'ai-chat-stream', + async (trace) => { + trace.updateTrace({ + name: 'ai-chat-stream', + sessionId: sessionId || 'unknown', + userId: userId || 'anonymous', + input: { messages, prompt: lastUserMessage }, + tags: ['sse', 'stream', 'rest-api'], + environment: appConfig.appEnv, + }) + + const generation = trace.startObservation( + 'ai-stream-generation', + { + model: MODEL_TYPE, + input: [{ role: 'system', content: prompt.prompt }, ...messages], + }, + { asType: 'generation' }, + ) + + generation.update({ + prompt, + }) + + // Capture IDs for client + traceId = trace.traceId + // @ts-expect-error - observationId exists but not in types + generationId = generation.observationId + + return streamText({ + model, + messages: [{ role: 'system', content: prompt.prompt }, ...messages], + experimental_transform: smoothStream({ + chunking: 'word', // Stream word-by-word for typing effect + }), + experimental_telemetry: { + isEnabled: true, + }, + onFinish: (event) => { + logger.info('Stream finished', { + traceId, + generationId, + textLength: event.text.length, + }) + + generation + .update({ + output: event.text, + usageDetails: { + input: event.usage.inputTokens, + output: event.usage.outputTokens, + total: event.usage.totalTokens, + }, + }) + .end() + + trace.update({ + output: { result: event.text }, + level: 'DEFAULT', + }) + }, + onError: (error) => { + const errorMessage = + error instanceof Error ? error.message : String(error) + + logger.error('Error generating chat response', { + traceId, + generationId, + error: errorMessage, + }) + + generation + .update({ + output: errorMessage, + level: 'ERROR', + }) + .end() + + trace.update({ + output: { error: errorMessage }, + level: 'ERROR', + }) + }, + }) + }, + ) + + // Pipe the UI message stream to Express response + // This uses the data stream protocol that DefaultChatTransport expects + result.pipeUIMessageStreamToResponse(res, { + headers: { + ...(traceId && { 'X-Trace-Id': traceId }), + ...(generationId && { 'X-Generation-Id': generationId }), + }, + }) + } catch (error) { + logger.error('Error in chat stream', { error }) + + const errorMessage = + error instanceof Error ? error.message : 'Unknown error occurred' + + // If headers haven't been sent yet, send error response + if (!res.headersSent) { + res.status(500).json({ error: errorMessage }) + } else { + res.end() + } + } +} + +const router = Router() + +router.post('/', handleChatStream) + +export default router diff --git a/packages/backend/src/routes/api/index.ts b/packages/backend/src/routes/api/index.ts index c74e04a30..bf8022134 100644 --- a/packages/backend/src/routes/api/index.ts +++ b/packages/backend/src/routes/api/index.ts @@ -1,8 +1,11 @@ import { Router } from 'express' +import chatRouter from './chat' + const router = Router() // Mount individual API routes +router.use('/chat', chatRouter) // Future routes can be added here: // router.use('/users', usersRouter) From 7c2d62012c3715876e91cebe3db8c7cfad9f5689 Mon Sep 17 00:00:00 2001 From: kevinkim-ogp Date: Wed, 5 Nov 2025 15:40:09 +0800 Subject: [PATCH 2/5] chore: update tags --- packages/backend/src/routes/api/chat.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/routes/api/chat.ts b/packages/backend/src/routes/api/chat.ts index 17db27edd..043529b91 100644 --- a/packages/backend/src/routes/api/chat.ts +++ b/packages/backend/src/routes/api/chat.ts @@ -72,7 +72,7 @@ async function handleChatStream(req: Request, res: Response) { sessionId: sessionId || 'unknown', userId: userId || 'anonymous', input: { messages, prompt: lastUserMessage }, - tags: ['sse', 'stream', 'rest-api'], + tags: ['stream', 'rest-api'], environment: appConfig.appEnv, }) From cc6e76986670aab776a9e074ae9bc20124bdc88c Mon Sep 17 00:00:00 2001 From: kevinkim-ogp Date: Thu, 6 Nov 2025 10:05:47 +0800 Subject: [PATCH 3/5] chore: apply middleware to all api routes --- packages/backend/src/routes/api/index.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/backend/src/routes/api/index.ts b/packages/backend/src/routes/api/index.ts index bf8022134..0d42c386f 100644 --- a/packages/backend/src/routes/api/index.ts +++ b/packages/backend/src/routes/api/index.ts @@ -1,9 +1,18 @@ import { Router } from 'express' +import { + requireAuthentication, + setCurrentUserContext, +} from './middleware/authentication' import chatRouter from './chat' const router = Router() +// Apply authentication middleware to ALL API routes +// This mirrors how GraphQL handles authentication via context +router.use(setCurrentUserContext) +router.use(requireAuthentication) + // Mount individual API routes router.use('/chat', chatRouter) From 3f07a83671a7985065c23942072e6e3082e4dba6 Mon Sep 17 00:00:00 2001 From: kevinkim-ogp Date: Fri, 14 Nov 2025 10:51:56 +0800 Subject: [PATCH 4/5] chore: update default prompt LD flag --- packages/backend/src/routes/api/chat.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/routes/api/chat.ts b/packages/backend/src/routes/api/chat.ts index 043529b91..ac54a9b23 100644 --- a/packages/backend/src/routes/api/chat.ts +++ b/packages/backend/src/routes/api/chat.ts @@ -27,7 +27,7 @@ async function handleChatStream(req: Request, res: Response) { 'ai-builder-prompt-config', context.currentUser.email, { - chatPrompt: 'aids-chat-v0', + chatPrompt: 'ai-builder/chat', version: 'production', }, ) From d9d43f78cd1045400f0ebe6fe1aca9f2c17a9d86 Mon Sep 17 00:00:00 2001 From: kevinkim-ogp Date: Thu, 20 Nov 2025 22:39:27 +0800 Subject: [PATCH 5/5] refactor: send traceId, use correct user --- packages/backend/src/routes/api/chat.ts | 37 ++++++++++++------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/packages/backend/src/routes/api/chat.ts b/packages/backend/src/routes/api/chat.ts index ac54a9b23..ff69789db 100644 --- a/packages/backend/src/routes/api/chat.ts +++ b/packages/backend/src/routes/api/chat.ts @@ -35,7 +35,7 @@ async function handleChatStream(req: Request, res: Response) { const { chatPrompt, version } = promptConfig try { - const { messages: rawMessages, userId, sessionId } = req.body as ChatRequest + const { messages: rawMessages, sessionId } = req.body as ChatRequest if ( !rawMessages || @@ -61,7 +61,6 @@ async function handleChatStream(req: Request, res: Response) { }) let traceId = '' - let generationId = '' // Setup observation and stream using AI SDK const result = await startActiveObservation( @@ -70,7 +69,7 @@ async function handleChatStream(req: Request, res: Response) { trace.updateTrace({ name: 'ai-chat-stream', sessionId: sessionId || 'unknown', - userId: userId || 'anonymous', + userId: context.currentUser.email || 'anonymous', input: { messages, prompt: lastUserMessage }, tags: ['stream', 'rest-api'], environment: appConfig.appEnv, @@ -91,8 +90,6 @@ async function handleChatStream(req: Request, res: Response) { // Capture IDs for client traceId = trace.traceId - // @ts-expect-error - observationId exists but not in types - generationId = generation.observationId return streamText({ model, @@ -106,10 +103,14 @@ async function handleChatStream(req: Request, res: Response) { onFinish: (event) => { logger.info('Stream finished', { traceId, - generationId, textLength: event.text.length, }) + trace.update({ + output: { result: event.text }, + level: 'DEFAULT', + }) + generation .update({ output: event.text, @@ -120,11 +121,6 @@ async function handleChatStream(req: Request, res: Response) { }, }) .end() - - trace.update({ - output: { result: event.text }, - level: 'DEFAULT', - }) }, onError: (error) => { const errorMessage = @@ -132,21 +128,20 @@ async function handleChatStream(req: Request, res: Response) { logger.error('Error generating chat response', { traceId, - generationId, error: errorMessage, }) + trace.update({ + output: { error: errorMessage }, + level: 'ERROR', + }) + generation .update({ output: errorMessage, level: 'ERROR', }) .end() - - trace.update({ - output: { error: errorMessage }, - level: 'ERROR', - }) }, }) }, @@ -155,9 +150,11 @@ async function handleChatStream(req: Request, res: Response) { // Pipe the UI message stream to Express response // This uses the data stream protocol that DefaultChatTransport expects result.pipeUIMessageStreamToResponse(res, { - headers: { - ...(traceId && { 'X-Trace-Id': traceId }), - ...(generationId && { 'X-Generation-Id': generationId }), + messageMetadata: () => { + return { + traceId, + model: MODEL_TYPE, + } }, }) } catch (error) {