Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions src/components/ChatArea.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { useEffect, useRef } from 'react'
import React from 'react'
import React, { useEffect, useRef } from 'react'
import { UserMessage } from './UserMessage'
import { AgentMessage } from './AgentMessage'
import { FeedbackButtons } from './FeedbackButtons'
Expand All @@ -10,12 +9,14 @@ interface ChatAreaProps {
messages: Array<ChatMessage>
isStreaming: boolean
onSendMessage: (text: string) => void
onStop?: () => void
}

export function ChatArea({
messages,
isStreaming,
onSendMessage,
onStop,
}: ChatAreaProps) {
const scrollRef = useRef<HTMLDivElement>(null)

Expand All @@ -34,47 +35,57 @@ export function ChatArea({
className="flex-1 overflow-y-auto p-4 md:p-6 chat-container pb-0"
>
{messages.map((msg, index) => {
const prevMsg: ChatMessage | undefined = messages[index - 1];
const nextMsg: ChatMessage | undefined = messages[index + 1];
const prevMsg = messages[index - 1] as ChatMessage | undefined
const nextMsg = messages[index + 1] as ChatMessage | undefined

return <React.Fragment key={msg.id}>
{msg.author === 'user'
? <UserMessage message={msg} />
: <AgentMessage
message={msg}
showAvatar={msg.author !== prevMsg?.author}
/>}
{
/* Show thumbs up/down when all below are true:
return (
<React.Fragment key={msg.id}>
{msg.author === 'user' ? (
<UserMessage message={msg} />
) : (
<AgentMessage
message={msg}
showAvatar={msg.author !== prevMsg?.author}
/>
)}
{
/* Show thumbs up/down when all below are true:
- Message has trace id
- Message is not streaming
- Next message has different trace id or doesn't exist
*/
msg.langfuseTraceId &&
!msg.isStreaming &&
(!nextMsg?.langfuseTraceId || msg.langfuseTraceId !== nextMsg.langfuseTraceId) &&
<FeedbackButtons traceId={msg.langfuseTraceId} />
}
</React.Fragment>
msg.langfuseTraceId &&
!msg.isStreaming &&
(!nextMsg?.langfuseTraceId ||
msg.langfuseTraceId !== nextMsg.langfuseTraceId) && (
<FeedbackButtons traceId={msg.langfuseTraceId} />
)
}
</React.Fragment>
)
})}

{
isStreaming && <p className="flex items-center gap-2 text-gray-500 mt-2">
{isStreaming && (
<p className="flex items-center gap-2 text-gray-500 mt-2">
正在思考中
<span className="typing-indicator ml-1">
<span />
<span />
<span />
</span>
</p>
}
)}

{/* Extra space at the bottom */}
<div className="h-4" />
</div>

{/* Input area */}
<ChatInput onSend={onSendMessage} disabled={isStreaming} />
<ChatInput
onSend={onSendMessage}
onStop={onStop}
isStreaming={isStreaming}
/>
</>
)
}
14 changes: 10 additions & 4 deletions src/components/ChatInput.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import { useCallback, useEffect, useRef, useState } from 'react'

interface ChatInputProps {
onSend: (text: string) => void
onStop?: () => void
isStreaming?: boolean
disabled?: boolean
placeholder?: string
}

export function ChatInput({
onSend,
onStop,
isStreaming,
disabled,
placeholder = '詢問後續問題或要求修改...',
}: ChatInputProps) {
Expand Down Expand Up @@ -45,16 +49,18 @@ export function ChatInput({
handleSubmit()
}
}}
disabled={disabled}
disabled={disabled || isStreaming}
className="w-full bg-transparent border-none focus:ring-0 p-3 pr-12 min-h-[50px] max-h-32 resize-none text-sm rounded-xl"
placeholder={placeholder}
/>
<button
onClick={handleSubmit}
disabled={!value.trim() || disabled}
onClick={isStreaming ? onStop : handleSubmit}
disabled={(!isStreaming && !value.trim()) || disabled}
className="absolute right-2 bottom-2 p-1.5 bg-primary text-black rounded-lg hover:bg-primary-hover transition-colors flex items-center justify-center disabled:opacity-40 disabled:cursor-not-allowed"
>
<span className="material-symbols-outlined text-sm">send</span>
<span className="material-symbols-outlined text-sm">
{isStreaming ? 'stop' : 'send'}
</span>
</button>
</div>
<div className="text-center mt-2">
Expand Down
65 changes: 54 additions & 11 deletions src/lib/chatCache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { runChat } from './sessions.functions'
import type { QueryClient } from '@tanstack/react-query'
import type {
AdkEvent,
Expand Down Expand Up @@ -65,20 +64,60 @@ export async function startChatStream({
})

try {
const stream = await runChat({
data: {
sessionId,
...payload,
},
const response = await fetch('/api/run-sse', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
signal: controller.signal,
body: JSON.stringify({ sessionId, ...payload }),
})

for await (const event of stream) {
processEventIntoCache(queryClient, sessionId, event)
if (!response.ok) {
throw new Error(`ADK request failed: HTTP ${response.status}`)
}

// Parse the ADK SSE stream directly from response.body.
// Unlike the old runChat server function approach, fetch's reader.read() throws
// AbortError immediately when controller.abort() is called — no intermediate
// layer to swallow it.
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''

try {
while (true) {
const { done, value } = await reader.read()
if (done) break

buffer += decoder.decode(value, { stream: true })
const parts = buffer.split('\n\n')
buffer = parts.pop() ?? ''

for (const part of parts) {
const lines = part.split('\n')
let data = ''
for (const line of lines) {
if (line.startsWith('data: ')) {
data += line.slice(6)
}
}
if (data) {
try {
processEventIntoCache(
queryClient,
sessionId,
JSON.parse(data) as AdkEvent,
)
} catch {
// Skip unparseable events
}
}
}
}
} finally {
reader.cancel()
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
// Expected when a stream is canceled
if (controller.signal.aborted) {
return
}
const errorMessage = err instanceof Error ? err.message : 'Unknown error'
Expand All @@ -89,9 +128,13 @@ export async function startChatStream({
})
console.error(`SSE stream error for session ${sessionId}:`, err)
} finally {
// Mark all streaming messages as complete
// Mark all streaming messages as complete.
// Guard against the race where a new stream has already started for this
// session (e.g. the user sent a new message while this one was streaming):
// in that case the new stream owns the state and we must not reset it.
queryClient.setQueryData<ChatSessionState>(queryKey, (prev) => {
if (!prev) return INITIAL_CHAT_STATE
if (abortControllers.get(sessionId) !== controller) return prev
return {
...prev,
isStreaming: false,
Expand Down
60 changes: 0 additions & 60 deletions src/lib/sessions.functions.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { createServerFn } from '@tanstack/react-start'
import { getRequest } from '@tanstack/react-start/server'
import { ADK_APP_NAME, ADK_USER_ID, adkClient } from './adkClient'
import { handleAdkError, handleAdkResponseError } from './adk.server'
import type { AdkEvent } from './adk'
import type { components } from './adk-types'

type RunRequest = components['schemas']['RunAgentRequest']

export const listSessions = createServerFn({ method: 'GET' }).handler(
async () => {
Expand Down Expand Up @@ -67,58 +62,3 @@ export const createSession = createServerFn({ method: 'POST' })
return { ok: true }
})

export type ChatInput = Omit<RunRequest, 'appName' | 'userId' | 'streaming'>

export const runChat = createServerFn({ method: 'POST' })
.inputValidator((data: ChatInput) => data)
.handler(async function* ({ data: input }) {
const body: RunRequest = {
...input,
appName: ADK_APP_NAME,
userId: ADK_USER_ID,
streaming: true,
}

const { response } = await adkClient.POST('/run_sse', {
parseAs: 'stream',
body,
signal: getRequest().signal,
})

if (!response.ok) {
handleAdkResponseError(response)
}

const reader = response.body?.getReader()
if (!reader) throw new Error('No response body from ADK')

const decoder = new TextDecoder()
let buffer = ''

// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
const { done, value } = await reader.read()
if (done) break

buffer += decoder.decode(value, { stream: true })
const parts = buffer.split('\n\n')
buffer = parts.pop() ?? ''

for (const part of parts) {
const lines = part.split('\n')
let data = ''
for (const line of lines) {
if (line.startsWith('data: ')) {
data += line.slice(6)
}
}
if (data) {
try {
yield JSON.parse(data) as AdkEvent
} catch {
// Skip unparseable events
}
}
}
}
})
29 changes: 26 additions & 3 deletions src/routeTree.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import { Route as rootRouteImport } from './routes/__root'
import { Route as AppRouteImport } from './routes/_app'
import { Route as AppIndexRouteImport } from './routes/_app/index'
import { Route as ApiRunSseRouteImport } from './routes/api/run-sse'
import { Route as AppSessionSessionIdRouteImport } from './routes/_app/session.$sessionId'

const AppRoute = AppRouteImport.update({
Expand All @@ -22,6 +23,11 @@ const AppIndexRoute = AppIndexRouteImport.update({
path: '/',
getParentRoute: () => AppRoute,
} as any)
const ApiRunSseRoute = ApiRunSseRouteImport.update({
id: '/api/run-sse',
path: '/api/run-sse',
getParentRoute: () => rootRouteImport,
} as any)
const AppSessionSessionIdRoute = AppSessionSessionIdRouteImport.update({
id: '/session/$sessionId',
path: '/session/$sessionId',
Expand All @@ -30,28 +36,37 @@ const AppSessionSessionIdRoute = AppSessionSessionIdRouteImport.update({

export interface FileRoutesByFullPath {
'/': typeof AppIndexRoute
'/api/run-sse': typeof ApiRunSseRoute
'/session/$sessionId': typeof AppSessionSessionIdRoute
}
export interface FileRoutesByTo {
'/api/run-sse': typeof ApiRunSseRoute
'/': typeof AppIndexRoute
'/session/$sessionId': typeof AppSessionSessionIdRoute
}
export interface FileRoutesById {
__root__: typeof rootRouteImport
'/_app': typeof AppRouteWithChildren
'/api/run-sse': typeof ApiRunSseRoute
'/_app/': typeof AppIndexRoute
'/_app/session/$sessionId': typeof AppSessionSessionIdRoute
}
export interface FileRouteTypes {
fileRoutesByFullPath: FileRoutesByFullPath
fullPaths: '/' | '/session/$sessionId'
fullPaths: '/' | '/api/run-sse' | '/session/$sessionId'
fileRoutesByTo: FileRoutesByTo
to: '/' | '/session/$sessionId'
id: '__root__' | '/_app' | '/_app/' | '/_app/session/$sessionId'
to: '/api/run-sse' | '/' | '/session/$sessionId'
id:
| '__root__'
| '/_app'
| '/api/run-sse'
| '/_app/'
| '/_app/session/$sessionId'
fileRoutesById: FileRoutesById
}
export interface RootRouteChildren {
AppRoute: typeof AppRouteWithChildren
ApiRunSseRoute: typeof ApiRunSseRoute
}

declare module '@tanstack/react-router' {
Expand All @@ -70,6 +85,13 @@ declare module '@tanstack/react-router' {
preLoaderRoute: typeof AppIndexRouteImport
parentRoute: typeof AppRoute
}
'/api/run-sse': {
id: '/api/run-sse'
path: '/api/run-sse'
fullPath: '/api/run-sse'
preLoaderRoute: typeof ApiRunSseRouteImport
parentRoute: typeof rootRouteImport
}
'/_app/session/$sessionId': {
id: '/_app/session/$sessionId'
path: '/session/$sessionId'
Expand All @@ -94,6 +116,7 @@ const AppRouteWithChildren = AppRoute._addFileChildren(AppRouteChildren)

const rootRouteChildren: RootRouteChildren = {
AppRoute: AppRouteWithChildren,
ApiRunSseRoute: ApiRunSseRoute,
}
export const routeTree = rootRouteImport
._addFileChildren(rootRouteChildren)
Expand Down
Loading
Loading